View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.nio.ByteBuffer;
22  import java.util.concurrent.atomic.AtomicReference;
23  
24  import org.eclipse.jetty.client.api.ContentProvider;
25  import org.eclipse.jetty.client.api.Request;
26  import org.eclipse.jetty.client.api.Result;
27  import org.eclipse.jetty.http.HttpHeader;
28  import org.eclipse.jetty.http.HttpHeaderValue;
29  import org.eclipse.jetty.util.BufferUtil;
30  import org.eclipse.jetty.util.Callback;
31  import org.eclipse.jetty.util.IteratingCallback;
32  import org.eclipse.jetty.util.log.Log;
33  import org.eclipse.jetty.util.log.Logger;
34  
35  /**
36   * {@link HttpSender} abstracts the algorithm to send HTTP requests, so that subclasses only implement
37   * the transport-specific code to send requests over the wire, implementing
38   * {@link #sendHeaders(HttpExchange, HttpContent, Callback)} and
39   * {@link #sendContent(HttpExchange, HttpContent, Callback)}.
40   * <p />
41   * {@link HttpSender} governs two state machines.
42   * <p />
43   * The request state machine is updated by {@link HttpSender} as the various steps of sending a request
44   * are executed, see {@link RequestState}.
45   * At any point in time, a user thread may abort the request, which may (if the request has not been
46   * completely sent yet) move the request state machine to {@link RequestState#FAILURE}.
47   * The request state machine guarantees that the request steps are executed (by I/O threads) only if
48   * the request has not been failed already.
49   * <p />
50   * The sender state machine is updated by {@link HttpSender} from three sources: deferred content notifications
51   * (via {@link #onContent()}), 100-continue notifications (via {@link #proceed(HttpExchange, boolean)})
52   * and normal request send (via {@link #sendContent(HttpExchange, HttpContent, Callback)}).
53   * This state machine must guarantee that the request sending is never executed concurrently: only one of
54   * those sources may trigger the call to {@link #sendContent(HttpExchange, HttpContent, Callback)}.
55   *
56   * @see HttpReceiver
57   */
58  public abstract class HttpSender implements AsyncContentProvider.Listener
59  {
60      protected static final Logger LOG = Log.getLogger(HttpSender.class);
61  
62      private final AtomicReference<RequestState> requestState = new AtomicReference<>(RequestState.QUEUED);
63      private final AtomicReference<SenderState> senderState = new AtomicReference<>(SenderState.IDLE);
64      private final Callback commitCallback = new CommitCallback();
65      private final Callback contentCallback = new ContentCallback();
66      private final Callback lastCallback = new LastContentCallback();
67      private final HttpChannel channel;
68      private volatile HttpContent content;
69  
70      protected HttpSender(HttpChannel channel)
71      {
72          this.channel = channel;
73      }
74  
75      protected HttpChannel getHttpChannel()
76      {
77          return channel;
78      }
79  
80      protected HttpExchange getHttpExchange()
81      {
82          return channel.getHttpExchange();
83      }
84  
85      @Override
86      public void onContent()
87      {
88          HttpExchange exchange = getHttpExchange();
89          if (exchange == null)
90              return;
91  
92          while (true)
93          {
94              SenderState current = senderState.get();
95              switch (current)
96              {
97                  case IDLE:
98                  {
99                      if (updateSenderState(current, SenderState.SENDING))
100                     {
101                         LOG.debug("Deferred content available, idle -> sending");
102                         HttpContent content = this.content;
103                         content.advance();
104                         sendContent(exchange, content, contentCallback);
105                         return;
106                     }
107                     break;
108                 }
109                 case SENDING:
110                 {
111                     if (updateSenderState(current, SenderState.SCHEDULED))
112                     {
113                         LOG.debug("Deferred content available, sending -> scheduled");
114                         return;
115                     }
116                     break;
117                 }
118                 case EXPECTING:
119                 {
120                     if (updateSenderState(current, SenderState.SCHEDULED))
121                     {
122                         LOG.debug("Deferred content available, expecting -> scheduled");
123                         return;
124                     }
125                     break;
126                 }
127                 case WAITING:
128                 {
129                     LOG.debug("Deferred content available, waiting");
130                     return;
131                 }
132                 case SCHEDULED:
133                 {
134                     LOG.debug("Deferred content available, scheduled");
135                     return;
136                 }
137                 default:
138                 {
139                     throw new IllegalStateException();
140                 }
141             }
142         }
143     }
144 
145     public void send(HttpExchange exchange)
146     {
147         Request request = exchange.getRequest();
148         Throwable cause = request.getAbortCause();
149         if (cause != null)
150         {
151             exchange.abort(cause);
152         }
153         else
154         {
155             if (!queuedToBegin(request))
156                 throw new IllegalStateException();
157 
158             if (!updateSenderState(SenderState.IDLE, expects100Continue(request) ? SenderState.EXPECTING : SenderState.SENDING))
159                 throw new IllegalStateException();
160 
161             ContentProvider contentProvider = request.getContent();
162             HttpContent content = this.content = new HttpContent(contentProvider);
163 
164             // Setting the listener may trigger calls to onContent() by other
165             // threads so we must set it only after the sender state has been updated
166             if (contentProvider instanceof AsyncContentProvider)
167                 ((AsyncContentProvider)contentProvider).setListener(this);
168 
169             if (!beginToHeaders(request))
170                 return;
171 
172             sendHeaders(exchange, content, commitCallback);
173         }
174     }
175 
176     protected boolean expects100Continue(Request request)
177     {
178         return request.getHeaders().contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString());
179     }
180 
181     protected boolean queuedToBegin(Request request)
182     {
183         if (!updateRequestState(RequestState.QUEUED, RequestState.BEGIN))
184             return false;
185         LOG.debug("Request begin {}", request);
186         RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
187         notifier.notifyBegin(request);
188         return true;
189     }
190 
191     protected boolean beginToHeaders(Request request)
192     {
193         if (!updateRequestState(RequestState.BEGIN, RequestState.HEADERS))
194             return false;
195         if (LOG.isDebugEnabled())
196             LOG.debug("Request headers {}{}{}", request, System.getProperty("line.separator"), request.getHeaders().toString().trim());
197         RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
198         notifier.notifyHeaders(request);
199         return true;
200     }
201 
202     protected boolean headersToCommit(Request request)
203     {
204         if (!updateRequestState(RequestState.HEADERS, RequestState.COMMIT))
205             return false;
206         LOG.debug("Request committed {}", request);
207         RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
208         notifier.notifyCommit(request);
209         return true;
210     }
211 
212     protected boolean someToContent(Request request, ByteBuffer content)
213     {
214         RequestState current = requestState.get();
215         switch (current)
216         {
217             case COMMIT:
218             case CONTENT:
219             {
220                 if (!updateRequestState(current, RequestState.CONTENT))
221                     return false;
222                 if (LOG.isDebugEnabled())
223                     LOG.debug("Request content {}{}{}", request, System.getProperty("line.separator"), BufferUtil.toDetailString(content));
224                 RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
225                 notifier.notifyContent(request, content);
226                 return true;
227             }
228             case FAILURE:
229             {
230                 return false;
231             }
232             default:
233             {
234                 throw new IllegalStateException();
235             }
236         }
237     }
238 
239     protected boolean someToSuccess(HttpExchange exchange)
240     {
241         RequestState current = requestState.get();
242         switch (current)
243         {
244             case COMMIT:
245             case CONTENT:
246             {
247                 // Mark atomically the request as completed, with respect
248                 // to concurrency between request success and request failure.
249                 boolean completed = exchange.requestComplete();
250                 if (!completed)
251                     return false;
252 
253                 // Reset to be ready for another request
254                 reset();
255 
256                 // Mark atomically the request as terminated and succeeded,
257                 // with respect to concurrency between request and response.
258                 Result result = exchange.terminateRequest(null);
259 
260                 // It is important to notify completion *after* we reset because
261                 // the notification may trigger another request/response
262                 Request request = exchange.getRequest();
263                 LOG.debug("Request success {}", request);
264                 HttpDestination destination = getHttpChannel().getHttpDestination();
265                 destination.getRequestNotifier().notifySuccess(exchange.getRequest());
266 
267                 if (result != null)
268                 {
269                     boolean ordered = destination.getHttpClient().isStrictEventOrdering();
270                     if (!ordered)
271                         channel.exchangeTerminated(result);
272                     LOG.debug("Request/Response succeded {}", request);
273                     HttpConversation conversation = exchange.getConversation();
274                     destination.getResponseNotifier().notifyComplete(conversation.getResponseListeners(), result);
275                     if (ordered)
276                         channel.exchangeTerminated(result);
277                 }
278 
279                 return true;
280             }
281             case FAILURE:
282             {
283                 return false;
284             }
285             default:
286             {
287                 throw new IllegalStateException();
288             }
289         }
290     }
291 
292     protected boolean anyToFailure(Throwable failure)
293     {
294         HttpExchange exchange = getHttpExchange();
295         if (exchange == null)
296             return false;
297 
298         // Mark atomically the request as completed, with respect
299         // to concurrency between request success and request failure.
300         boolean completed = exchange.requestComplete();
301         if (!completed)
302             return false;
303 
304         // Dispose to avoid further requests
305         RequestState requestState = dispose();
306 
307         // Mark atomically the request as terminated and failed,
308         // with respect to concurrency between request and response.
309         Result result = exchange.terminateRequest(failure);
310 
311         Request request = exchange.getRequest();
312         LOG.debug("Request failure {} {}", exchange, failure);
313         HttpDestination destination = getHttpChannel().getHttpDestination();
314         destination.getRequestNotifier().notifyFailure(request, failure);
315 
316         boolean notCommitted = isBeforeCommit(requestState);
317         if (result == null && notCommitted && request.getAbortCause() == null)
318         {
319             // Complete the response from here
320             if (exchange.responseComplete())
321             {
322                 result = exchange.terminateResponse(failure);
323                 LOG.debug("Failed response from request {}", exchange);
324             }
325         }
326 
327         if (result != null)
328         {
329             boolean ordered = destination.getHttpClient().isStrictEventOrdering();
330             if (!ordered)
331                 channel.exchangeTerminated(result);
332             LOG.debug("Request/Response failed {}", request);
333             HttpConversation conversation = exchange.getConversation();
334             destination.getResponseNotifier().notifyComplete(conversation.getResponseListeners(), result);
335             if (ordered)
336                 channel.exchangeTerminated(result);
337         }
338 
339         return true;
340     }
341 
342     /**
343      * Implementations should send the HTTP headers over the wire, possibly with some content,
344      * in a single write, and notify the given {@code callback} of the result of this operation.
345      * <p />
346      * If there is more content to send, then {@link #sendContent(HttpExchange, HttpContent, Callback)}
347      * will be invoked.
348      *
349      * @param exchange the exchange to send
350      * @param content the content to send
351      * @param callback the callback to notify
352      */
353     protected abstract void sendHeaders(HttpExchange exchange, HttpContent content, Callback callback);
354 
355     /**
356      * Implementations should send the content at the {@link HttpContent} cursor position over the wire.
357      * <p />
358      * The {@link HttpContent} cursor is advanced by {@link HttpSender} at the right time, and if more
359      * content needs to be sent, this method is invoked again; subclasses need only to send the content
360      * at the {@link HttpContent} cursor position.
361      * <p />
362      * This method is invoked one last time when {@link HttpContent#isConsumed()} is true and therefore
363      * there is no actual content to send.
364      * This is done to allow subclasses to write "terminal" bytes (such as the terminal chunk when the
365      * transfer encoding is chunked) if their protocol needs to.
366      *
367      * @param exchange the exchange to send
368      * @param content the content to send
369      * @param callback the callback to notify
370      */
371     protected abstract void sendContent(HttpExchange exchange, HttpContent content, Callback callback);
372 
373     protected void reset()
374     {
375         content = null;
376         requestState.set(RequestState.QUEUED);
377         senderState.set(SenderState.IDLE);
378     }
379 
380     protected RequestState dispose()
381     {
382         while (true)
383         {
384             RequestState current = requestState.get();
385             if (updateRequestState(current, RequestState.FAILURE))
386                 return current;
387         }
388     }
389 
390     public void proceed(HttpExchange exchange, boolean proceed)
391     {
392         if (!expects100Continue(exchange.getRequest()))
393             return;
394 
395         if (proceed)
396         {
397             while (true)
398             {
399                 SenderState current = senderState.get();
400                 switch (current)
401                 {
402                     case EXPECTING:
403                     {
404                         // We are still sending the headers, but we already got the 100 Continue.
405                         // Move to SEND so that the commit callback can send the content.
406                         if (!updateSenderState(current, SenderState.SENDING))
407                             break;
408                         LOG.debug("Proceed while expecting");
409                         return;
410                     }
411                     case WAITING:
412                     {
413                         // We received the 100 Continue, send the content if any.
414                         // First update the sender state to be sure to be the one
415                         // to call sendContent() since we race with onContent().
416                         if (!updateSenderState(current, SenderState.SENDING))
417                             break;
418                         HttpContent content = this.content;
419                         if (content.advance())
420                         {
421                             // There is content to send
422                             LOG.debug("Proceed while waiting");
423                             sendContent(exchange, content, contentCallback);
424                         }
425                         else
426                         {
427                             // No content to send yet - it's deferred.
428                             // We may fail the update as onContent() moved to SCHEDULE.
429                             if (!updateSenderState(SenderState.SENDING, SenderState.IDLE))
430                                 break;
431                             LOG.debug("Proceed deferred");
432                         }
433                         return;
434                     }
435                     case SCHEDULED:
436                     {
437                         // We lost the race with onContent() to update the state, try again
438                         if (!updateSenderState(current, SenderState.WAITING))
439                             throw new IllegalStateException();
440                         LOG.debug("Proceed while scheduled");
441                         break;
442                     }
443                     default:
444                     {
445                         throw new IllegalStateException();
446                     }
447                 }
448             }
449         }
450         else
451         {
452             anyToFailure(new HttpRequestException("Expectation failed", exchange.getRequest()));
453         }
454     }
455 
456     public boolean abort(Throwable failure)
457     {
458         RequestState current = requestState.get();
459         boolean abortable = isBeforeCommit(current) ||
460                 isSending(current) && !content.isLast();
461         return abortable && anyToFailure(failure);
462     }
463 
464     protected boolean updateRequestState(RequestState from, RequestState to)
465     {
466         boolean updated = requestState.compareAndSet(from, to);
467         if (!updated)
468             LOG.debug("RequestState update failed: {} -> {}: {}", from, to, requestState.get());
469         return updated;
470     }
471 
472     private boolean updateSenderState(SenderState from, SenderState to)
473     {
474         boolean updated = senderState.compareAndSet(from, to);
475         if (!updated)
476             LOG.debug("SenderState update failed: {} -> {}: {}", from, to, senderState.get());
477         return updated;
478     }
479 
480     private boolean isBeforeCommit(RequestState requestState)
481     {
482         switch (requestState)
483         {
484             case QUEUED:
485             case BEGIN:
486             case HEADERS:
487                 return true;
488             default:
489                 return false;
490         }
491     }
492 
493     private boolean isSending(RequestState requestState)
494     {
495         switch (requestState)
496         {
497             case COMMIT:
498             case CONTENT:
499                 return true;
500             default:
501                 return false;
502         }
503     }
504 
505     /**
506      * The request states {@link HttpSender} goes through when sending a request.
507      */
508     protected enum RequestState
509     {
510         /**
511          * The request is queued, the initial state
512          */
513         QUEUED,
514         /**
515          * The request has been dequeued
516          */
517         BEGIN,
518         /**
519          * The request headers (and possibly some content) is about to be sent
520          */
521         HEADERS,
522         /**
523          * The request headers (and possibly some content) have been sent
524          */
525         COMMIT,
526         /**
527          * The request content is being sent
528          */
529         CONTENT,
530         /**
531          * The request is failed
532          */
533         FAILURE
534     }
535 
536     /**
537      * The sender states {@link HttpSender} goes through when sending a request.
538      */
539     private enum SenderState
540     {
541         /**
542          * {@link HttpSender} is not sending the request
543          */
544         IDLE,
545         /**
546          * {@link HttpSender} is sending the request
547          */
548         SENDING,
549         /**
550          * {@link HttpSender} is sending the headers but will wait for 100-Continue before sending the content
551          */
552         EXPECTING,
553         /**
554          * {@link HttpSender} is waiting for 100-Continue
555          */
556         WAITING,
557         /**
558          * {@link HttpSender} is currently sending the request, and deferred content is available to be sent
559          */
560         SCHEDULED
561     }
562 
563     private class CommitCallback implements Callback
564     {
565         @Override
566         public void succeeded()
567         {
568             try
569             {
570                 process();
571             }
572             // Catch-all for runtime exceptions
573             catch (Exception x)
574             {
575                 anyToFailure(x);
576             }
577         }
578 
579         private void process() throws Exception
580         {
581             HttpExchange exchange = getHttpExchange();
582             if (exchange == null)
583                 return;
584 
585             Request request = exchange.getRequest();
586             if (!headersToCommit(request))
587                 return;
588 
589             HttpContent content = HttpSender.this.content;
590 
591             if (!content.hasContent())
592             {
593                 // No content to send, we are done.
594                 someToSuccess(exchange);
595             }
596             else
597             {
598                 // Was any content sent while committing ?
599                 ByteBuffer contentBuffer = content.getContent();
600                 if (contentBuffer != null)
601                 {
602                     if (!someToContent(request, contentBuffer))
603                         return;
604                 }
605 
606                 while (true)
607                 {
608                     SenderState current = senderState.get();
609                     switch (current)
610                     {
611                         case SENDING:
612                         {
613                             // We have content to send ?
614                             if (content.advance())
615                             {
616                                 sendContent(exchange, content, contentCallback);
617                             }
618                             else
619                             {
620                                 if (content.isConsumed())
621                                 {
622                                     sendContent(exchange, content, lastCallback);
623                                 }
624                                 else
625                                 {
626                                     if (!updateSenderState(current, SenderState.IDLE))
627                                         break;
628                                     LOG.debug("Waiting for deferred content for {}", request);
629                                 }
630                             }
631                             return;
632                         }
633                         case EXPECTING:
634                         {
635                             // Wait for the 100 Continue response
636                             if (!updateSenderState(current, SenderState.WAITING))
637                                 break;
638                             return;
639                         }
640                         case SCHEDULED:
641                         {
642                             if (expects100Continue(request))
643                                 return;
644                             // We have deferred content to send.
645                             updateSenderState(current, SenderState.SENDING);
646                             break;
647                         }
648                         default:
649                         {
650                             throw new IllegalStateException();
651                         }
652                     }
653                 }
654             }
655         }
656 
657         @Override
658         public void failed(Throwable failure)
659         {
660             anyToFailure(failure);
661         }
662     }
663 
664     private class ContentCallback extends IteratingCallback
665     {
666         @Override
667         protected boolean process() throws Exception
668         {
669             HttpExchange exchange = getHttpExchange();
670             if (exchange == null)
671                 return false;
672 
673             Request request = exchange.getRequest();
674             HttpContent content = HttpSender.this.content;
675 
676             ByteBuffer contentBuffer = content.getContent();
677             if (contentBuffer != null)
678             {
679                 if (!someToContent(request, contentBuffer))
680                     return false;
681             }
682 
683             if (content.advance())
684             {
685                 // There is more content to send
686                 sendContent(exchange, content, this);
687             }
688             else
689             {
690                 if (content.isConsumed())
691                 {
692                     sendContent(exchange, content, lastCallback);
693                 }
694                 else
695                 {
696                     while (true)
697                     {
698                         SenderState current = senderState.get();
699                         switch (current)
700                         {
701                             case SENDING:
702                             {
703                                 if (updateSenderState(current, SenderState.IDLE))
704                                 {
705                                     LOG.debug("Waiting for deferred content for {}", request);
706                                     return false;
707                                 }
708                                 break;
709                             }
710                             case SCHEDULED:
711                             {
712                                 if (updateSenderState(current, SenderState.SENDING))
713                                 {
714                                     LOG.debug("Deferred content available for {}", request);
715                                     // TODO: this case is not covered by tests
716                                     sendContent(exchange, content, this);
717                                     return false;
718                                 }
719                                 break;
720                             }
721                             default:
722                             {
723                                 throw new IllegalStateException();
724                             }
725                         }
726                     }
727                 }
728             }
729             return false;
730         }
731 
732         @Override
733         protected void completed()
734         {
735             // Nothing to do, since we always return false from process().
736             // Termination is obtained via LastContentCallback.
737         }
738 
739         @Override
740         public void failed(Throwable failure)
741         {
742             super.failed(failure);
743             anyToFailure(failure);
744         }
745     }
746 
747     private class LastContentCallback implements Callback
748     {
749         @Override
750         public void succeeded()
751         {
752             HttpExchange exchange = getHttpExchange();
753             if (exchange == null)
754                 return;
755             someToSuccess(exchange);
756         }
757 
758         @Override
759         public void failed(Throwable failure)
760         {
761             anyToFailure(failure);
762         }
763     }
764 }