View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.nio.ByteBuffer;
22  import java.util.concurrent.atomic.AtomicReference;
23  
24  import org.eclipse.jetty.client.api.ContentProvider;
25  import org.eclipse.jetty.client.api.Request;
26  import org.eclipse.jetty.client.api.Result;
27  import org.eclipse.jetty.http.HttpHeader;
28  import org.eclipse.jetty.http.HttpHeaderValue;
29  import org.eclipse.jetty.util.BufferUtil;
30  import org.eclipse.jetty.util.Callback;
31  import org.eclipse.jetty.util.IteratingCallback;
32  import org.eclipse.jetty.util.log.Log;
33  import org.eclipse.jetty.util.log.Logger;
34  
35  /**
36   * {@link HttpSender} abstracts the algorithm to send HTTP requests, so that subclasses only implement
37   * the transport-specific code to send requests over the wire, implementing
38   * {@link #sendHeaders(HttpExchange, HttpContent, Callback)} and
39   * {@link #sendContent(HttpExchange, HttpContent, Callback)}.
40   * <p>
41   * {@link HttpSender} governs two state machines.
42   * <p>
43   * The request state machine is updated by {@link HttpSender} as the various steps of sending a request
44   * are executed, see <code>RequestState</code>.
45   * At any point in time, a user thread may abort the request, which may (if the request has not been
46   * completely sent yet) move the request state machine to <code>RequestState#FAILURE</code>.
47   * The request state machine guarantees that the request steps are executed (by I/O threads) only if
48   * the request has not been failed already.
49   * <p>
50   * The sender state machine is updated by {@link HttpSender} from three sources: deferred content notifications
51   * (via {@link #onContent()}), 100-continue notifications (via {@link #proceed(HttpExchange, Throwable)})
52   * and normal request send (via {@link #sendContent(HttpExchange, HttpContent, Callback)}).
53   * This state machine must guarantee that the request sending is never executed concurrently: only one of
54   * those sources may trigger the call to {@link #sendContent(HttpExchange, HttpContent, Callback)}.
55   *
56   * @see HttpReceiver
57   */
58  public abstract class HttpSender implements AsyncContentProvider.Listener
59  {
60      protected static final Logger LOG = Log.getLogger(HttpSender.class);
61  
62      private final AtomicReference<RequestState> requestState = new AtomicReference<>(RequestState.QUEUED);
63      private final AtomicReference<SenderState> senderState = new AtomicReference<>(SenderState.IDLE);
64      private final Callback commitCallback = new CommitCallback();
65      private final IteratingCallback contentCallback = new ContentCallback();
66      private final Callback lastCallback = new LastContentCallback();
67      private final HttpChannel channel;
68      private HttpContent content;
69      private Throwable failure;
70  
71      protected HttpSender(HttpChannel channel)
72      {
73          this.channel = channel;
74      }
75  
76      protected HttpChannel getHttpChannel()
77      {
78          return channel;
79      }
80  
81      protected HttpExchange getHttpExchange()
82      {
83          return channel.getHttpExchange();
84      }
85  
86      @Override
87      public void onContent()
88      {
89          HttpExchange exchange = getHttpExchange();
90          if (exchange == null)
91              return;
92  
93          while (true)
94          {
95              SenderState current = senderState.get();
96              switch (current)
97              {
98                  case IDLE:
99                  {
100                     SenderState newSenderState = SenderState.SENDING;
101                     if (updateSenderState(current, newSenderState))
102                     {
103                         if (LOG.isDebugEnabled())
104                             LOG.debug("Deferred content available, {} -> {}", current, newSenderState);
105                         contentCallback.iterate();
106                         return;
107                     }
108                     break;
109                 }
110                 case SENDING:
111                 {
112                     SenderState newSenderState = SenderState.SENDING_WITH_CONTENT;
113                     if (updateSenderState(current, newSenderState))
114                     {
115                         if (LOG.isDebugEnabled())
116                             LOG.debug("Deferred content available, {} -> {}", current, newSenderState);
117                         return;
118                     }
119                     break;
120                 }
121                 case EXPECTING:
122                 {
123                     SenderState newSenderState = SenderState.EXPECTING_WITH_CONTENT;
124                     if (updateSenderState(current, newSenderState))
125                     {
126                         if (LOG.isDebugEnabled())
127                             LOG.debug("Deferred content available, {} -> {}", current, newSenderState);
128                         return;
129                     }
130                     break;
131                 }
132                 case PROCEEDING:
133                 {
134                     SenderState newSenderState = SenderState.PROCEEDING_WITH_CONTENT;
135                     if (updateSenderState(current, newSenderState))
136                     {
137                         if (LOG.isDebugEnabled())
138                             LOG.debug("Deferred content available, {} -> {}", current, newSenderState);
139                         return;
140                     }
141                     break;
142                 }
143                 case SENDING_WITH_CONTENT:
144                 case EXPECTING_WITH_CONTENT:
145                 case PROCEEDING_WITH_CONTENT:
146                 case WAITING:
147                 case COMPLETED:
148                 case FAILED:
149                 {
150                     if (LOG.isDebugEnabled())
151                         LOG.debug("Deferred content available, {}", current);
152                     return;
153                 }
154                 default:
155                 {
156                     illegalSenderState(current);
157                     return;
158                 }
159             }
160         }
161     }
162 
163     public void send(HttpExchange exchange)
164     {
165         if (!queuedToBegin(exchange))
166             return;
167 
168         Request request = exchange.getRequest();
169         ContentProvider contentProvider = request.getContent();
170         HttpContent content = this.content = new HttpContent(contentProvider);
171 
172         SenderState newSenderState = SenderState.SENDING;
173         if (expects100Continue(request))
174             newSenderState = content.hasContent() ? SenderState.EXPECTING_WITH_CONTENT : SenderState.EXPECTING;
175 
176         out: while (true)
177         {
178             SenderState current = senderState.get();
179             switch (current)
180             {
181                 case IDLE:
182                 case COMPLETED:
183                 {
184                     if (updateSenderState(current, newSenderState))
185                         break out;
186                     break;
187                 }
188                 default:
189                 {
190                     illegalSenderState(current);
191                     return;
192                 }
193             }
194         }
195 
196         // Setting the listener may trigger calls to onContent() by other
197         // threads so we must set it only after the sender state has been updated
198         if (contentProvider instanceof AsyncContentProvider)
199             ((AsyncContentProvider)contentProvider).setListener(this);
200 
201         if (!beginToHeaders(exchange))
202             return;
203 
204         sendHeaders(exchange, content, commitCallback);
205     }
206 
207     protected boolean expects100Continue(Request request)
208     {
209         return request.getHeaders().contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString());
210     }
211 
212     protected boolean queuedToBegin(HttpExchange exchange)
213     {
214         if (!updateRequestState(RequestState.QUEUED, RequestState.TRANSIENT))
215             return false;
216 
217         Request request = exchange.getRequest();
218         if (LOG.isDebugEnabled())
219             LOG.debug("Request begin {}", request);
220         RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
221         notifier.notifyBegin(request);
222 
223         if (updateRequestState(RequestState.TRANSIENT, RequestState.BEGIN))
224             return true;
225 
226         terminateRequest(exchange);
227         return false;
228     }
229 
230     protected boolean beginToHeaders(HttpExchange exchange)
231     {
232         if (!updateRequestState(RequestState.BEGIN, RequestState.TRANSIENT))
233             return false;
234 
235         Request request = exchange.getRequest();
236         if (LOG.isDebugEnabled())
237             LOG.debug("Request headers {}{}{}", request, System.lineSeparator(), request.getHeaders().toString().trim());
238         RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
239         notifier.notifyHeaders(request);
240 
241         if (updateRequestState(RequestState.TRANSIENT, RequestState.HEADERS))
242             return true;
243 
244         terminateRequest(exchange);
245         return false;
246     }
247 
248     protected boolean headersToCommit(HttpExchange exchange)
249     {
250         if (!updateRequestState(RequestState.HEADERS, RequestState.TRANSIENT))
251             return false;
252 
253         Request request = exchange.getRequest();
254         if (LOG.isDebugEnabled())
255             LOG.debug("Request committed {}", request);
256         RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
257         notifier.notifyCommit(request);
258 
259         if (updateRequestState(RequestState.TRANSIENT, RequestState.COMMIT))
260             return true;
261 
262         terminateRequest(exchange);
263         return false;
264     }
265 
266     protected boolean someToContent(HttpExchange exchange, ByteBuffer content)
267     {
268         RequestState current = requestState.get();
269         switch (current)
270         {
271             case COMMIT:
272             case CONTENT:
273             {
274                 if (!updateRequestState(current, RequestState.TRANSIENT))
275                     return false;
276 
277                 Request request = exchange.getRequest();
278                 if (LOG.isDebugEnabled())
279                     LOG.debug("Request content {}{}{}", request, System.lineSeparator(), BufferUtil.toDetailString(content));
280                 RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
281                 notifier.notifyContent(request, content);
282 
283                 if (updateRequestState(RequestState.TRANSIENT, RequestState.CONTENT))
284                     return true;
285 
286                 terminateRequest(exchange);
287                 return false;
288             }
289             default:
290             {
291                 return false;
292             }
293         }
294     }
295 
296     protected boolean someToSuccess(HttpExchange exchange)
297     {
298         RequestState current = requestState.get();
299         switch (current)
300         {
301             case COMMIT:
302             case CONTENT:
303             {
304                 // Mark atomically the request as completed, with respect
305                 // to concurrency between request success and request failure.
306                 if (!exchange.requestComplete(null))
307                     return false;
308 
309                 requestState.set(RequestState.QUEUED);
310 
311                 // Reset to be ready for another request.
312                 reset();
313 
314                 Request request = exchange.getRequest();
315                 if (LOG.isDebugEnabled())
316                     LOG.debug("Request success {}", request);
317                 HttpDestination destination = getHttpChannel().getHttpDestination();
318                 destination.getRequestNotifier().notifySuccess(exchange.getRequest());
319 
320                 // Mark atomically the request as terminated, with
321                 // respect to concurrency between request and response.
322                 Result result = exchange.terminateRequest();
323                 terminateRequest(exchange, null, result);
324                 return true;
325             }
326             default:
327             {
328                 return false;
329             }
330         }
331     }
332 
333     protected boolean anyToFailure(Throwable failure)
334     {
335         HttpExchange exchange = getHttpExchange();
336         if (exchange == null)
337             return false;
338 
339         // Mark atomically the request as completed, with respect
340         // to concurrency between request success and request failure.
341         if (exchange.requestComplete(failure))
342             return abort(exchange, failure);
343 
344         return false;
345     }
346 
347     private void terminateRequest(HttpExchange exchange)
348     {
349         // In abort(), the state is updated before the failure is recorded
350         // to avoid to overwrite it, so here we may read a null failure.
351         Throwable failure = this.failure;
352         if (failure == null)
353             failure = new HttpRequestException("Concurrent failure", exchange.getRequest());
354         Result result = exchange.terminateRequest();
355         terminateRequest(exchange, failure, result);
356     }
357 
358     private void terminateRequest(HttpExchange exchange, Throwable failure, Result result)
359     {
360         Request request = exchange.getRequest();
361 
362         if (LOG.isDebugEnabled())
363             LOG.debug("Terminating request {}", request);
364 
365         if (result == null)
366         {
367             if (failure != null)
368             {
369                 if (exchange.responseComplete(failure))
370                 {
371                     if (LOG.isDebugEnabled())
372                         LOG.debug("Response failure from request {} {}", request, exchange);
373                     getHttpChannel().abortResponse(exchange, failure);
374                 }
375             }
376         }
377         else
378         {
379             HttpDestination destination = getHttpChannel().getHttpDestination();
380             boolean ordered = destination.getHttpClient().isStrictEventOrdering();
381             if (!ordered)
382                 channel.exchangeTerminated(exchange, result);
383             if (LOG.isDebugEnabled())
384                 LOG.debug("Request/Response {}: {}", failure == null ? "succeeded" : "failed", result);
385             HttpConversation conversation = exchange.getConversation();
386             destination.getResponseNotifier().notifyComplete(conversation.getResponseListeners(), result);
387             if (ordered)
388                 channel.exchangeTerminated(exchange, result);
389         }
390     }
391 
392     /**
393      * Implementations should send the HTTP headers over the wire, possibly with some content,
394      * in a single write, and notify the given {@code callback} of the result of this operation.
395      * <p>
396      * If there is more content to send, then {@link #sendContent(HttpExchange, HttpContent, Callback)}
397      * will be invoked.
398      *
399      * @param exchange the exchange to send
400      * @param content the content to send
401      * @param callback the callback to notify
402      */
403     protected abstract void sendHeaders(HttpExchange exchange, HttpContent content, Callback callback);
404 
405     /**
406      * Implementations should send the content at the {@link HttpContent} cursor position over the wire.
407      * <p>
408      * The {@link HttpContent} cursor is advanced by {@link HttpSender} at the right time, and if more
409      * content needs to be sent, this method is invoked again; subclasses need only to send the content
410      * at the {@link HttpContent} cursor position.
411      * <p>
412      * This method is invoked one last time when {@link HttpContent#isConsumed()} is true and therefore
413      * there is no actual content to send.
414      * This is done to allow subclasses to write "terminal" bytes (such as the terminal chunk when the
415      * transfer encoding is chunked) if their protocol needs to.
416      *
417      * @param exchange the exchange to send
418      * @param content the content to send
419      * @param callback the callback to notify
420      */
421     protected abstract void sendContent(HttpExchange exchange, HttpContent content, Callback callback);
422 
423     protected void reset()
424     {
425         HttpContent content = this.content;
426         this.content = null;
427         content.close();
428         senderState.set(SenderState.COMPLETED);
429     }
430 
431     protected void dispose()
432     {
433         HttpContent content = this.content;
434         this.content = null;
435         if (content != null)
436             content.close();
437         senderState.set(SenderState.FAILED);
438     }
439 
440     public void proceed(HttpExchange exchange, Throwable failure)
441     {
442         if (!expects100Continue(exchange.getRequest()))
443             return;
444 
445         if (failure != null)
446         {
447             anyToFailure(failure);
448             return;
449         }
450 
451         while (true)
452         {
453             SenderState current = senderState.get();
454             switch (current)
455             {
456                 case EXPECTING:
457                 {
458                     // We are still sending the headers, but we already got the 100 Continue.
459                     if (updateSenderState(current, SenderState.PROCEEDING))
460                     {
461                         if (LOG.isDebugEnabled())
462                             LOG.debug("Proceeding while expecting");
463                         return;
464                     }
465                     break;
466                 }
467                 case EXPECTING_WITH_CONTENT:
468                 {
469                     // More deferred content was submitted to onContent(), we already
470                     // got the 100 Continue, but we may be still sending the headers
471                     // (for example, with SSL we may have sent the encrypted data,
472                     // received the 100 Continue but not yet updated the decrypted
473                     // WriteFlusher so sending more content now may result in a
474                     // WritePendingException).
475                     if (updateSenderState(current, SenderState.PROCEEDING_WITH_CONTENT))
476                     {
477                         if (LOG.isDebugEnabled())
478                             LOG.debug("Proceeding while scheduled");
479                         return;
480                     }
481                     break;
482                 }
483                 case WAITING:
484                 {
485                     // We received the 100 Continue, now send the content if any.
486                     if (updateSenderState(current, SenderState.SENDING))
487                     {
488                         if (LOG.isDebugEnabled())
489                             LOG.debug("Proceeding while waiting");
490                         contentCallback.iterate();
491                         return;
492                     }
493                     break;
494                 }
495                 case FAILED:
496                 {
497                     return;
498                 }
499                 default:
500                 {
501                     illegalSenderState(current);
502                     return;
503                 }
504             }
505         }
506     }
507 
508     public boolean abort(HttpExchange exchange, Throwable failure)
509     {
510         // Update the state to avoid more request processing.
511         boolean terminate;
512         out: while (true)
513         {
514             RequestState current = requestState.get();
515             switch (current)
516             {
517                 case FAILURE:
518                 {
519                     return false;
520                 }
521                 default:
522                 {
523                     if (updateRequestState(current, RequestState.FAILURE))
524                     {
525                         terminate = current != RequestState.TRANSIENT;
526                         break out;
527                     }
528                     break;
529                 }
530             }
531         }
532 
533         this.failure = failure;
534 
535         dispose();
536 
537         Request request = exchange.getRequest();
538         if (LOG.isDebugEnabled())
539             LOG.debug("Request failure {} {} on {}: {}", request, exchange, getHttpChannel(), failure);
540         HttpDestination destination = getHttpChannel().getHttpDestination();
541         destination.getRequestNotifier().notifyFailure(request, failure);
542 
543         if (terminate)
544         {
545             // Mark atomically the request as terminated, with
546             // respect to concurrency between request and response.
547             Result result = exchange.terminateRequest();
548             terminateRequest(exchange, failure, result);
549         }
550         else
551         {
552             if (LOG.isDebugEnabled())
553                 LOG.debug("Concurrent failure: request termination skipped, performed by helpers");
554         }
555 
556         return true;
557     }
558 
559     private boolean updateRequestState(RequestState from, RequestState to)
560     {
561         boolean updated = requestState.compareAndSet(from, to);
562         if (!updated && LOG.isDebugEnabled())
563             LOG.debug("RequestState update failed: {} -> {}: {}", from, to, requestState.get());
564         return updated;
565     }
566 
567     private boolean updateSenderState(SenderState from, SenderState to)
568     {
569         boolean updated = senderState.compareAndSet(from, to);
570         if (!updated && LOG.isDebugEnabled())
571             LOG.debug("SenderState update failed: {} -> {}: {}", from, to, senderState.get());
572         return updated;
573     }
574 
575     private void illegalSenderState(SenderState current)
576     {
577         anyToFailure(new IllegalStateException("Expected " + current + " found " + senderState.get() + " instead"));
578     }
579 
580     @Override
581     public String toString()
582     {
583         return String.format("%s@%x(req=%s,snd=%s,failure=%s)",
584                 getClass().getSimpleName(),
585                 hashCode(),
586                 requestState,
587                 senderState,
588                 failure);
589     }
590 
591     /**
592      * The request states {@link HttpSender} goes through when sending a request.
593      */
594     private enum RequestState
595     {
596         /**
597          * One of the state transition methods is being executed.
598          */
599         TRANSIENT,
600         /**
601          * The request is queued, the initial state
602          */
603         QUEUED,
604         /**
605          * The request has been dequeued
606          */
607         BEGIN,
608         /**
609          * The request headers (and possibly some content) is about to be sent
610          */
611         HEADERS,
612         /**
613          * The request headers (and possibly some content) have been sent
614          */
615         COMMIT,
616         /**
617          * The request content is being sent
618          */
619         CONTENT,
620         /**
621          * The request is failed
622          */
623         FAILURE
624     }
625 
626     /**
627      * The sender states {@link HttpSender} goes through when sending a request.
628      */
629     private enum SenderState
630     {
631         /**
632          * {@link HttpSender} is not sending request headers nor request content
633          */
634         IDLE,
635         /**
636          * {@link HttpSender} is sending the request header or request content
637          */
638         SENDING,
639         /**
640          * {@link HttpSender} is currently sending the request, and deferred content is available to be sent
641          */
642         SENDING_WITH_CONTENT,
643         /**
644          * {@link HttpSender} is sending the headers but will wait for 100 Continue before sending the content
645          */
646         EXPECTING,
647         /**
648          * {@link HttpSender} is currently sending the headers, will wait for 100 Continue, and deferred content is available to be sent
649          */
650         EXPECTING_WITH_CONTENT,
651         /**
652          * {@link HttpSender} has sent the headers and is waiting for 100 Continue
653          */
654         WAITING,
655         /**
656          * {@link HttpSender} is sending the headers, while 100 Continue has arrived
657          */
658         PROCEEDING,
659         /**
660          * {@link HttpSender} is sending the headers, while 100 Continue has arrived, and deferred content is available to be sent
661          */
662         PROCEEDING_WITH_CONTENT,
663         /**
664          * {@link HttpSender} has finished to send the request
665          */
666         COMPLETED,
667         /**
668          * {@link HttpSender} has failed to send the request
669          */
670         FAILED
671     }
672 
673     private class CommitCallback implements Callback
674     {
675 
676         @Override
677         public boolean isNonBlocking()
678         {
679             return content.isNonBlocking();
680         }
681 
682         @Override
683         public void succeeded()
684         {
685             try
686             {
687                 HttpContent content = HttpSender.this.content;
688                 if (content == null)
689                     return;
690                 content.succeeded();
691                 process();
692             }
693             catch (Throwable x)
694             {
695                 anyToFailure(x);
696             }
697         }
698 
699         @Override
700         public void failed(Throwable failure)
701         {
702             HttpContent content = HttpSender.this.content;
703             if (content == null)
704                 return;
705             content.failed(failure);
706             anyToFailure(failure);
707         }
708 
709         private void process() throws Exception
710         {
711             HttpExchange exchange = getHttpExchange();
712             if (exchange == null)
713                 return;
714 
715             if (!headersToCommit(exchange))
716                 return;
717 
718              HttpContent content = HttpSender.this.content;
719             if (content == null)
720                 return;
721 
722             if (!content.hasContent())
723             {
724                 // No content to send, we are done.
725                 someToSuccess(exchange);
726             }
727             else
728             {
729                 // Was any content sent while committing ?
730                 ByteBuffer contentBuffer = content.getContent();
731                 if (contentBuffer != null)
732                 {
733                     if (!someToContent(exchange, contentBuffer))
734                         return;
735                 }
736 
737                 while (true)
738                 {
739                     SenderState current = senderState.get();
740                     switch (current)
741                     {
742                         case SENDING:
743                         {
744                             contentCallback.iterate();
745                             return;
746                         }
747                         case SENDING_WITH_CONTENT:
748                         {
749                             // We have deferred content to send.
750                             updateSenderState(current, SenderState.SENDING);
751                             break;
752                         }
753                         case EXPECTING:
754                         {
755                             // We sent the headers, wait for the 100 Continue response.
756                             if (updateSenderState(current, SenderState.WAITING))
757                                 return;
758                             break;
759                         }
760                         case EXPECTING_WITH_CONTENT:
761                         {
762                             // We sent the headers, we have deferred content to send,
763                             // wait for the 100 Continue response.
764                             if (updateSenderState(current, SenderState.WAITING))
765                                 return;
766                             break;
767                         }
768                         case PROCEEDING:
769                         {
770                             // We sent the headers, we have the 100 Continue response,
771                             // we have no content to send.
772                             if (updateSenderState(current, SenderState.IDLE))
773                                 return;
774                             break;
775                         }
776                         case PROCEEDING_WITH_CONTENT:
777                         {
778                             // We sent the headers, we have the 100 Continue response,
779                             // we have deferred content to send.
780                             updateSenderState(current, SenderState.SENDING);
781                             break;
782                         }
783                         case FAILED:
784                         {
785                             return;
786                         }
787                         default:
788                         {
789                             illegalSenderState(current);
790                             return;
791                         }
792                     }
793                 }
794             }
795         }
796     }
797 
798     private class ContentCallback extends IteratingCallback
799     {
800         @Override
801         protected Action process() throws Exception
802         {
803             HttpExchange exchange = getHttpExchange();
804             if (exchange == null)
805                 return Action.IDLE;
806 
807             HttpContent content = HttpSender.this.content;
808             if (content == null)
809                 return Action.IDLE;
810 
811             while (true)
812             {
813                 boolean advanced = content.advance();
814                 boolean lastContent = content.isLast();
815                 if (LOG.isDebugEnabled())
816                     LOG.debug("Content present {}, last {}, consumed {} for {}", advanced, lastContent, content.isConsumed(), exchange.getRequest());
817 
818                 if (advanced)
819                 {
820                     sendContent(exchange, content, this);
821                     return Action.SCHEDULED;
822                 }
823 
824                 if (lastContent)
825                 {
826                     sendContent(exchange, content, lastCallback);
827                     return Action.IDLE;
828                 }
829 
830                 SenderState current = senderState.get();
831                 switch (current)
832                 {
833                     case SENDING:
834                     {
835                         if (updateSenderState(current, SenderState.IDLE))
836                         {
837                             if (LOG.isDebugEnabled())
838                                 LOG.debug("Content is deferred for {}", exchange.getRequest());
839                             return Action.IDLE;
840                         }
841                         break;
842                     }
843                     case SENDING_WITH_CONTENT:
844                     {
845                         updateSenderState(current, SenderState.SENDING);
846                         break;
847                     }
848                     default:
849                     {
850                         illegalSenderState(current);
851                         return Action.IDLE;
852                     }
853                 }
854             }
855         }
856 
857         @Override
858         public void succeeded()
859         {
860             HttpExchange exchange = getHttpExchange();
861             if (exchange == null)
862                 return;
863             HttpContent content = HttpSender.this.content;
864             if (content == null)
865                 return;
866             content.succeeded();
867             ByteBuffer buffer = content.getContent();
868             someToContent(exchange, buffer);
869             super.succeeded();
870         }
871 
872         @Override
873         public void onCompleteFailure(Throwable failure)
874         {
875             HttpContent content = HttpSender.this.content;
876             if (content == null)
877                 return;
878             content.failed(failure);
879             anyToFailure(failure);
880         }
881 
882         @Override
883         protected void onCompleteSuccess()
884         {
885             // Nothing to do, since we always return false from process().
886             // Termination is obtained via LastContentCallback.
887         }
888     }
889 
890     private class LastContentCallback implements Callback
891     {
892         @Override
893         public boolean isNonBlocking()
894         {
895             return content.isNonBlocking();
896         }
897 
898         @Override
899         public void succeeded()
900         {
901             HttpExchange exchange = getHttpExchange();
902             if (exchange == null)
903                 return;
904             HttpContent content = HttpSender.this.content;
905             if (content == null)
906                 return;
907             content.succeeded();
908             someToSuccess(exchange);
909         }
910 
911         @Override
912         public void failed(Throwable failure)
913         {
914             HttpContent content = HttpSender.this.content;
915             if (content == null)
916                 return;
917             content.failed(failure);
918             anyToFailure(failure);
919         }
920     }
921 }