View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.nio.ByteBuffer;
22  import java.util.concurrent.atomic.AtomicReference;
23  
24  import org.eclipse.jetty.client.api.ContentProvider;
25  import org.eclipse.jetty.client.api.Request;
26  import org.eclipse.jetty.client.api.Result;
27  import org.eclipse.jetty.http.HttpHeader;
28  import org.eclipse.jetty.http.HttpHeaderValue;
29  import org.eclipse.jetty.util.BufferUtil;
30  import org.eclipse.jetty.util.Callback;
31  import org.eclipse.jetty.util.IteratingCallback;
32  import org.eclipse.jetty.util.log.Log;
33  import org.eclipse.jetty.util.log.Logger;
34  
35  /**
36   * {@link HttpSender} abstracts the algorithm to send HTTP requests, so that subclasses only implement
37   * the transport-specific code to send requests over the wire, implementing
38   * {@link #sendHeaders(HttpExchange, HttpContent, Callback)} and
39   * {@link #sendContent(HttpExchange, HttpContent, Callback)}.
40   * <p />
41   * {@link HttpSender} governs two state machines.
42   * <p />
43   * The request state machine is updated by {@link HttpSender} as the various steps of sending a request
44   * are executed, see {@link RequestState}.
45   * At any point in time, a user thread may abort the request, which may (if the request has not been
46   * completely sent yet) move the request state machine to {@link RequestState#FAILURE}.
47   * The request state machine guarantees that the request steps are executed (by I/O threads) only if
48   * the request has not been failed already.
49   * <p />
50   * The sender state machine is updated by {@link HttpSender} from three sources: deferred content notifications
51   * (via {@link #onContent()}), 100-continue notifications (via {@link #proceed(HttpExchange, Throwable)})
52   * and normal request send (via {@link #sendContent(HttpExchange, HttpContent, Callback)}).
53   * This state machine must guarantee that the request sending is never executed concurrently: only one of
54   * those sources may trigger the call to {@link #sendContent(HttpExchange, HttpContent, Callback)}.
55   *
56   * @see HttpReceiver
57   */
58  public abstract class HttpSender implements AsyncContentProvider.Listener
59  {
60      protected static final Logger LOG = Log.getLogger(HttpSender.class);
61  
62      private final AtomicReference<RequestState> requestState = new AtomicReference<>(RequestState.QUEUED);
63      private final AtomicReference<SenderState> senderState = new AtomicReference<>(SenderState.IDLE);
64      private final Callback commitCallback = new CommitCallback();
65      private final IteratingCallback contentCallback = new ContentCallback();
66      private final Callback lastCallback = new LastContentCallback();
67      private final HttpChannel channel;
68      private HttpContent content;
69      private Throwable failure;
70  
71      protected HttpSender(HttpChannel channel)
72      {
73          this.channel = channel;
74      }
75  
76      protected HttpChannel getHttpChannel()
77      {
78          return channel;
79      }
80  
81      protected HttpExchange getHttpExchange()
82      {
83          return channel.getHttpExchange();
84      }
85  
86      @Override
87      public void onContent()
88      {
89          HttpExchange exchange = getHttpExchange();
90          if (exchange == null)
91              return;
92  
93          while (true)
94          {
95              SenderState current = senderState.get();
96              switch (current)
97              {
98                  case IDLE:
99                  {
100                     SenderState newSenderState = SenderState.SENDING;
101                     if (updateSenderState(current, newSenderState))
102                     {
103                         if (LOG.isDebugEnabled())
104                             LOG.debug("Deferred content available, {} -> {}", current, newSenderState);
105                         contentCallback.iterate();
106                         return;
107                     }
108                     break;
109                 }
110                 case SENDING:
111                 {
112                     SenderState newSenderState = SenderState.SENDING_WITH_CONTENT;
113                     if (updateSenderState(current, newSenderState))
114                     {
115                         if (LOG.isDebugEnabled())
116                             LOG.debug("Deferred content available, {} -> {}", current, newSenderState);
117                         return;
118                     }
119                     break;
120                 }
121                 case EXPECTING:
122                 {
123                     SenderState newSenderState = SenderState.EXPECTING_WITH_CONTENT;
124                     if (updateSenderState(current, newSenderState))
125                     {
126                         if (LOG.isDebugEnabled())
127                             LOG.debug("Deferred content available, {} -> {}", current, newSenderState);
128                         return;
129                     }
130                     break;
131                 }
132                 case PROCEEDING:
133                 {
134                     SenderState newSenderState = SenderState.PROCEEDING_WITH_CONTENT;
135                     if (updateSenderState(current, newSenderState))
136                     {
137                         if (LOG.isDebugEnabled())
138                             LOG.debug("Deferred content available, {} -> {}", current, newSenderState);
139                         return;
140                     }
141                     break;
142                 }
143                 case SENDING_WITH_CONTENT:
144                 case EXPECTING_WITH_CONTENT:
145                 case PROCEEDING_WITH_CONTENT:
146                 case WAITING:
147                 {
148                     if (LOG.isDebugEnabled())
149                         LOG.debug("Deferred content available, {}", current);
150                     return;
151                 }
152                 default:
153                 {
154                     throw illegalSenderState(current);
155                 }
156             }
157         }
158     }
159 
160     public void send(HttpExchange exchange)
161     {
162         Request request = exchange.getRequest();
163         Throwable cause = request.getAbortCause();
164         if (cause != null)
165         {
166             exchange.abort(cause);
167         }
168         else
169         {
170             if (!queuedToBegin(request))
171                 throw new IllegalStateException();
172 
173             ContentProvider contentProvider = request.getContent();
174             HttpContent content = this.content = new HttpContent(contentProvider);
175 
176             SenderState newSenderState = SenderState.SENDING;
177             if (expects100Continue(request))
178                 newSenderState = content.hasContent() ? SenderState.EXPECTING_WITH_CONTENT : SenderState.EXPECTING;
179             if (!updateSenderState(SenderState.IDLE, newSenderState))
180                 throw illegalSenderState(SenderState.IDLE);
181 
182             // Setting the listener may trigger calls to onContent() by other
183             // threads so we must set it only after the sender state has been updated
184             if (contentProvider instanceof AsyncContentProvider)
185                 ((AsyncContentProvider)contentProvider).setListener(this);
186 
187             if (!beginToHeaders(request))
188                 return;
189 
190             sendHeaders(exchange, content, commitCallback);
191         }
192     }
193 
194     protected boolean expects100Continue(Request request)
195     {
196         return request.getHeaders().contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString());
197     }
198 
199     protected boolean queuedToBegin(Request request)
200     {
201         if (!updateRequestState(RequestState.QUEUED, RequestState.TRANSIENT))
202             return false;
203         if (LOG.isDebugEnabled())
204             LOG.debug("Request begin {}", request);
205         RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
206         notifier.notifyBegin(request);
207         if (!updateRequestState(RequestState.TRANSIENT, RequestState.BEGIN))
208             terminateRequest(getHttpExchange(), failure, false);
209         return true;
210     }
211 
212     protected boolean beginToHeaders(Request request)
213     {
214         if (!updateRequestState(RequestState.BEGIN, RequestState.TRANSIENT))
215             return false;
216         if (LOG.isDebugEnabled())
217             LOG.debug("Request headers {}{}{}", request, System.getProperty("line.separator"), request.getHeaders().toString().trim());
218         RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
219         notifier.notifyHeaders(request);
220         if (!updateRequestState(RequestState.TRANSIENT, RequestState.HEADERS))
221             terminateRequest(getHttpExchange(), failure, false);
222         return true;
223     }
224 
225     protected boolean headersToCommit(Request request)
226     {
227         if (!updateRequestState(RequestState.HEADERS, RequestState.TRANSIENT))
228             return false;
229         if (LOG.isDebugEnabled())
230             LOG.debug("Request committed {}", request);
231         RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
232         notifier.notifyCommit(request);
233         if (!updateRequestState(RequestState.TRANSIENT, RequestState.COMMIT))
234             terminateRequest(getHttpExchange(), failure, true);
235         return true;
236     }
237 
238     protected boolean someToContent(Request request, ByteBuffer content)
239     {
240         RequestState current = requestState.get();
241         switch (current)
242         {
243             case COMMIT:
244             case CONTENT:
245             {
246                 if (!updateRequestState(current, RequestState.TRANSIENT_CONTENT))
247                     return false;
248                 if (LOG.isDebugEnabled())
249                     LOG.debug("Request content {}{}{}", request, System.getProperty("line.separator"), BufferUtil.toDetailString(content));
250                 RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
251                 notifier.notifyContent(request, content);
252                 if (!updateRequestState(RequestState.TRANSIENT_CONTENT, RequestState.CONTENT))
253                     terminateRequest(getHttpExchange(), failure, true);
254                 return true;
255             }
256             default:
257             {
258                 return false;
259             }
260         }
261     }
262 
263     protected boolean someToSuccess(HttpExchange exchange)
264     {
265         RequestState current = requestState.get();
266         switch (current)
267         {
268             case COMMIT:
269             case CONTENT:
270             {
271                 // Mark atomically the request as completed, with respect
272                 // to concurrency between request success and request failure.
273                 boolean completed = exchange.requestComplete();
274                 if (!completed)
275                     return false;
276 
277                 requestState.set(RequestState.QUEUED);
278 
279                 // Reset to be ready for another request.
280                 reset();
281 
282                 // Mark atomically the request as terminated and succeeded,
283                 // with respect to concurrency between request and response.
284                 Result result = exchange.terminateRequest(null);
285 
286                 Request request = exchange.getRequest();
287                 if (LOG.isDebugEnabled())
288                     LOG.debug("Request success {}", request);
289                 HttpDestination destination = getHttpChannel().getHttpDestination();
290                 destination.getRequestNotifier().notifySuccess(exchange.getRequest());
291 
292                 terminateRequest(exchange, null, true, result);
293 
294                 return true;
295             }
296             default:
297             {
298                 return false;
299             }
300         }
301     }
302 
303     protected boolean anyToFailure(Throwable failure)
304     {
305         HttpExchange exchange = getHttpExchange();
306         if (exchange == null)
307             return false;
308 
309         // Mark atomically the request as completed, with respect
310         // to concurrency between request success and request failure.
311         boolean completed = exchange.requestComplete();
312         if (!completed)
313             return false;
314 
315         this.failure = failure;
316 
317         // Update the state to avoid more request processing.
318         RequestState current;
319         boolean fail;
320         while (true)
321         {
322             current = requestState.get();
323             if (updateRequestState(current, RequestState.FAILURE))
324             {
325                 fail = current != RequestState.TRANSIENT && current != RequestState.TRANSIENT_CONTENT;
326                 break;
327             }
328         }
329 
330         dispose();
331 
332         // Mark atomically the request as terminated and failed,
333         // with respect to concurrency between request and response.
334         Result result = exchange.terminateRequest(failure);
335 
336         Request request = exchange.getRequest();
337         if (LOG.isDebugEnabled())
338             LOG.debug("Request failure {} {}", exchange, failure);
339         HttpDestination destination = getHttpChannel().getHttpDestination();
340         destination.getRequestNotifier().notifyFailure(request, failure);
341 
342         if (fail)
343         {
344             terminateRequest(exchange, failure, !isBeforeCommit(current), result);
345         }
346         else
347         {
348             if (LOG.isDebugEnabled())
349                 LOG.debug("Concurrent failure: request termination skipped, performed by helpers");
350         }
351 
352         return true;
353     }
354 
355     private void terminateRequest(HttpExchange exchange, Throwable failure, boolean committed)
356     {
357         if (exchange != null)
358         {
359             Result result = exchange.terminateRequest(failure);
360             terminateRequest(exchange, failure, committed, result);
361         }
362     }
363 
364     private void terminateRequest(HttpExchange exchange, Throwable failure, boolean committed, Result result)
365     {
366         Request request = exchange.getRequest();
367 
368         if (LOG.isDebugEnabled())
369             LOG.debug("Terminating request {}", request);
370 
371         if (failure != null && !committed && result == null && request.getAbortCause() == null)
372         {
373             // Complete the response from here
374             if (exchange.responseComplete())
375             {
376                 result = exchange.terminateResponse(failure);
377                 if (LOG.isDebugEnabled())
378                     LOG.debug("Failed response from request {}", exchange);
379             }
380         }
381 
382         if (result != null)
383         {
384             HttpDestination destination = getHttpChannel().getHttpDestination();
385             boolean ordered = destination.getHttpClient().isStrictEventOrdering();
386             if (!ordered)
387                 channel.exchangeTerminated(result);
388             if (LOG.isDebugEnabled())
389                 LOG.debug("Request/Response {} {}", failure == null ? "succeeded" : "failed", request);
390             HttpConversation conversation = exchange.getConversation();
391             destination.getResponseNotifier().notifyComplete(conversation.getResponseListeners(), result);
392             if (ordered)
393                 channel.exchangeTerminated(result);
394         }
395     }
396 
397     /**
398      * Implementations should send the HTTP headers over the wire, possibly with some content,
399      * in a single write, and notify the given {@code callback} of the result of this operation.
400      * <p />
401      * If there is more content to send, then {@link #sendContent(HttpExchange, HttpContent, Callback)}
402      * will be invoked.
403      *
404      * @param exchange the exchange to send
405      * @param content the content to send
406      * @param callback the callback to notify
407      */
408     protected abstract void sendHeaders(HttpExchange exchange, HttpContent content, Callback callback);
409 
410     /**
411      * Implementations should send the content at the {@link HttpContent} cursor position over the wire.
412      * <p />
413      * The {@link HttpContent} cursor is advanced by {@link HttpSender} at the right time, and if more
414      * content needs to be sent, this method is invoked again; subclasses need only to send the content
415      * at the {@link HttpContent} cursor position.
416      * <p />
417      * This method is invoked one last time when {@link HttpContent#isConsumed()} is true and therefore
418      * there is no actual content to send.
419      * This is done to allow subclasses to write "terminal" bytes (such as the terminal chunk when the
420      * transfer encoding is chunked) if their protocol needs to.
421      *
422      * @param exchange the exchange to send
423      * @param content the content to send
424      * @param callback the callback to notify
425      */
426     protected abstract void sendContent(HttpExchange exchange, HttpContent content, Callback callback);
427 
428     protected void reset()
429     {
430         content.close();
431         content = null;
432         senderState.set(SenderState.IDLE);
433     }
434 
435     protected void dispose()
436     {
437         HttpContent content = this.content;
438         if (content != null)
439             content.close();
440     }
441 
442     public void proceed(HttpExchange exchange, Throwable failure)
443     {
444         if (!expects100Continue(exchange.getRequest()))
445             return;
446 
447         if (failure != null)
448         {
449             anyToFailure(failure);
450             return;
451         }
452 
453         while (true)
454         {
455             SenderState current = senderState.get();
456             switch (current)
457             {
458                 case EXPECTING:
459                 {
460                     // We are still sending the headers, but we already got the 100 Continue.
461                     if (updateSenderState(current, SenderState.PROCEEDING))
462                     {
463                         if (LOG.isDebugEnabled())
464                             LOG.debug("Proceeding while expecting");
465                         return;
466                     }
467                     break;
468                 }
469                 case EXPECTING_WITH_CONTENT:
470                 {
471                     // More deferred content was submitted to onContent(), we already
472                     // got the 100 Continue, but we may be still sending the headers
473                     // (for example, with SSL we may have sent the encrypted data,
474                     // received the 100 Continue but not yet updated the decrypted
475                     // WriteFlusher so sending more content now may result in a
476                     // WritePendingException).
477                     if (updateSenderState(current, SenderState.PROCEEDING_WITH_CONTENT))
478                     {
479                         if (LOG.isDebugEnabled())
480                             LOG.debug("Proceeding while scheduled");
481                         return;
482                     }
483                     break;
484                 }
485                 case WAITING:
486                 {
487                     // We received the 100 Continue, now send the content if any.
488                     if (!updateSenderState(current, SenderState.SENDING))
489                         throw illegalSenderState(current);
490                     if (LOG.isDebugEnabled())
491                         LOG.debug("Proceeding while waiting");
492                     contentCallback.iterate();
493                     return;
494                 }
495                 default:
496                 {
497                     throw illegalSenderState(current);
498                 }
499             }
500         }
501     }
502 
503     public boolean abort(Throwable failure)
504     {
505         RequestState current = requestState.get();
506         boolean abortable = isBeforeCommit(current) || isSending(current);
507         return abortable && anyToFailure(failure);
508     }
509 
510     private boolean updateRequestState(RequestState from, RequestState to)
511     {
512         boolean updated = requestState.compareAndSet(from, to);
513         if (!updated)
514             LOG.debug("RequestState update failed: {} -> {}: {}", from, to, requestState.get());
515         return updated;
516     }
517 
518     private boolean updateSenderState(SenderState from, SenderState to)
519     {
520         boolean updated = senderState.compareAndSet(from, to);
521         if (!updated)
522             LOG.debug("SenderState update failed: {} -> {}: {}", from, to, senderState.get());
523         return updated;
524     }
525 
526     private boolean isBeforeCommit(RequestState requestState)
527     {
528         switch (requestState)
529         {
530             case TRANSIENT:
531             case QUEUED:
532             case BEGIN:
533             case HEADERS:
534                 return true;
535             default:
536                 return false;
537         }
538     }
539 
540     private boolean isSending(RequestState requestState)
541     {
542         switch (requestState)
543         {
544             case TRANSIENT_CONTENT:
545             case COMMIT:
546             case CONTENT:
547                 return true;
548             default:
549                 return false;
550         }
551     }
552 
553     private RuntimeException illegalSenderState(SenderState current)
554     {
555         return new IllegalStateException("Expected " + current + " found " + senderState.get() + " instead");
556     }
557 
558     /**
559      * The request states {@link HttpSender} goes through when sending a request.
560      */
561     private enum RequestState
562     {
563         /**
564          * One of the state transition methods is being executed.
565          */
566         TRANSIENT,
567         /**
568          * The content transition method is being executed.
569          */
570         TRANSIENT_CONTENT,
571         /**
572          * The request is queued, the initial state
573          */
574         QUEUED,
575         /**
576          * The request has been dequeued
577          */
578         BEGIN,
579         /**
580          * The request headers (and possibly some content) is about to be sent
581          */
582         HEADERS,
583         /**
584          * The request headers (and possibly some content) have been sent
585          */
586         COMMIT,
587         /**
588          * The request content is being sent
589          */
590         CONTENT,
591         /**
592          * The request is failed
593          */
594         FAILURE
595     }
596 
597     /**
598      * The sender states {@link HttpSender} goes through when sending a request.
599      */
600     private enum SenderState
601     {
602         /**
603          * {@link HttpSender} is not sending request headers nor request content
604          */
605         IDLE,
606         /**
607          * {@link HttpSender} is sending the request header or request content
608          */
609         SENDING,
610         /**
611          * {@link HttpSender} is currently sending the request, and deferred content is available to be sent
612          */
613         SENDING_WITH_CONTENT,
614         /**
615          * {@link HttpSender} is sending the headers but will wait for 100 Continue before sending the content
616          */
617         EXPECTING,
618         /**
619          * {@link HttpSender} is currently sending the headers, will wait for 100 Continue, and deferred content is available to be sent
620          */
621         EXPECTING_WITH_CONTENT,
622         /**
623          * {@link HttpSender} has sent the headers and is waiting for 100 Continue
624          */
625         WAITING,
626         /**
627          * {@link HttpSender} is sending the headers, while 100 Continue has arrived
628          */
629         PROCEEDING,
630         /**
631          * {@link HttpSender} is sending the headers, while 100 Continue has arrived, and deferred content is available to be sent
632          */
633         PROCEEDING_WITH_CONTENT
634     }
635 
636     private class CommitCallback implements Callback
637     {
638         @Override
639         public void succeeded()
640         {
641             try
642             {
643                 content.succeeded();
644                 process();
645             }
646             // Catch-all for runtime exceptions
647             catch (Exception x)
648             {
649                 anyToFailure(x);
650             }
651         }
652 
653         @Override
654         public void failed(Throwable failure)
655         {
656             content.failed(failure);
657             anyToFailure(failure);
658         }
659 
660         private void process() throws Exception
661         {
662             HttpExchange exchange = getHttpExchange();
663             if (exchange == null)
664                 return;
665 
666             Request request = exchange.getRequest();
667             if (!headersToCommit(request))
668                 return;
669 
670             HttpContent content = HttpSender.this.content;
671 
672             if (!content.hasContent())
673             {
674                 // No content to send, we are done.
675                 someToSuccess(exchange);
676             }
677             else
678             {
679                 // Was any content sent while committing ?
680                 ByteBuffer contentBuffer = content.getContent();
681                 if (contentBuffer != null)
682                 {
683                     if (!someToContent(request, contentBuffer))
684                         return;
685                 }
686 
687                 while (true)
688                 {
689                     SenderState current = senderState.get();
690                     switch (current)
691                     {
692                         case SENDING:
693                         {
694                             contentCallback.iterate();
695                             return;
696                         }
697                         case SENDING_WITH_CONTENT:
698                         {
699                             // We have deferred content to send.
700                             updateSenderState(current, SenderState.SENDING);
701                             break;
702                         }
703                         case EXPECTING:
704                         {
705                             // We sent the headers, wait for the 100 Continue response.
706                             if (updateSenderState(current, SenderState.WAITING))
707                                 return;
708                             break;
709                         }
710                         case EXPECTING_WITH_CONTENT:
711                         {
712                             // We sent the headers, we have deferred content to send,
713                             // wait for the 100 Continue response.
714                             if (updateSenderState(current, SenderState.WAITING))
715                                 return;
716                             break;
717                         }
718                         case PROCEEDING:
719                         {
720                             // We sent the headers, we have the 100 Continue response,
721                             // we have no content to send.
722                             if (updateSenderState(current, SenderState.IDLE))
723                                 return;
724                             break;
725                         }
726                         case PROCEEDING_WITH_CONTENT:
727                         {
728                             // We sent the headers, we have the 100 Continue response,
729                             // we have deferred content to send.
730                             updateSenderState(current, SenderState.SENDING);
731                             break;
732                         }
733                         default:
734                         {
735                             throw illegalSenderState(current);
736                         }
737                     }
738                 }
739             }
740         }
741     }
742 
743     private class ContentCallback extends IteratingCallback
744     {
745         @Override
746         protected Action process() throws Exception
747         {
748             HttpExchange exchange = getHttpExchange();
749             if (exchange == null)
750                 return Action.IDLE;
751 
752             HttpContent content = HttpSender.this.content;
753             while (true)
754             {
755                 boolean advanced = content.advance();
756                 boolean consumed = content.isConsumed();
757                 if (LOG.isDebugEnabled())
758                     LOG.debug("Content {} consumed {} for {}", advanced, consumed, exchange.getRequest());
759 
760                 if (advanced)
761                 {
762                     sendContent(exchange, content, this);
763                     return Action.SCHEDULED;
764                 }
765 
766                 if (consumed)
767                 {
768                     sendContent(exchange, content, lastCallback);
769                     return Action.IDLE;
770                 }
771 
772                 SenderState current = HttpSender.this.senderState.get();
773                 switch (current)
774                 {
775                     case SENDING:
776                     {
777                         if (updateSenderState(current, SenderState.IDLE))
778                         {
779                             if (LOG.isDebugEnabled())
780                                 LOG.debug("Content is deferred for {}", exchange.getRequest());
781                             return Action.IDLE;
782                         }
783                         break;
784                     }
785                     case SENDING_WITH_CONTENT:
786                     {
787                         updateSenderState(current, SenderState.SENDING);
788                         break;
789                     }
790                     default:
791                     {
792                         throw illegalSenderState(current);
793                     }
794                 }
795             }
796         }
797 
798         @Override
799         public void succeeded()
800         {
801             ByteBuffer buffer = content.getContent();
802             someToContent(getHttpExchange().getRequest(), buffer);
803             content.succeeded();
804             super.succeeded();
805         }
806 
807         @Override
808         public void onCompleteFailure(Throwable failure)
809         {
810             content.failed(failure);
811             anyToFailure(failure);
812         }
813 
814         @Override
815         protected void onCompleteSuccess()
816         {
817             // Nothing to do, since we always return false from process().
818             // Termination is obtained via LastContentCallback.
819         }
820     }
821 
822     private class LastContentCallback implements Callback
823     {
824         @Override
825         public void succeeded()
826         {
827             content.succeeded();
828             HttpExchange exchange = getHttpExchange();
829             if (exchange == null)
830                 return;
831             someToSuccess(exchange);
832         }
833 
834         @Override
835         public void failed(Throwable failure)
836         {
837             content.failed(failure);
838             anyToFailure(failure);
839         }
840     }
841 }