View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.nio.ByteBuffer;
22  import java.util.Collections;
23  import java.util.Iterator;
24  import java.util.concurrent.CountDownLatch;
25  import java.util.concurrent.Executor;
26  import java.util.concurrent.atomic.AtomicMarkableReference;
27  import java.util.concurrent.atomic.AtomicReference;
28  
29  import org.eclipse.jetty.client.api.ContentProvider;
30  import org.eclipse.jetty.client.api.Request;
31  import org.eclipse.jetty.client.api.Result;
32  import org.eclipse.jetty.http.HttpGenerator;
33  import org.eclipse.jetty.http.HttpHeader;
34  import org.eclipse.jetty.http.HttpHeaderValue;
35  import org.eclipse.jetty.io.ByteBufferPool;
36  import org.eclipse.jetty.io.EndPoint;
37  import org.eclipse.jetty.util.BufferUtil;
38  import org.eclipse.jetty.util.Callback;
39  import org.eclipse.jetty.util.log.Log;
40  import org.eclipse.jetty.util.log.Logger;
41  
42  public class HttpSender implements AsyncContentProvider.Listener
43  {
44      private static final Logger LOG = Log.getLogger(HttpSender.class);
45      private static final String EXPECT_100_ATTRIBUTE = HttpSender.class.getName() + ".expect100";
46  
47      private final AtomicReference<State> state = new AtomicReference<>(State.IDLE);
48      private final AtomicReference<SendState> sendState = new AtomicReference<>(SendState.IDLE);
49      private final HttpGenerator generator = new HttpGenerator();
50      private final HttpConnection connection;
51      private Iterator<ByteBuffer> contentIterator;
52      private ContinueContentChunk continueContentChunk;
53  
54      public HttpSender(HttpConnection connection)
55      {
56          this.connection = connection;
57      }
58  
59      @Override
60      public void onContent()
61      {
62          while (true)
63          {
64              SendState current = sendState.get();
65              switch (current)
66              {
67                  case IDLE:
68                  {
69                      if (updateSendState(current, SendState.EXECUTE))
70                      {
71                          LOG.debug("Deferred content available, sending");
72                          send();
73                          return;
74                      }
75                      break;
76                  }
77                  case EXECUTE:
78                  {
79                      if (updateSendState(current, SendState.SCHEDULE))
80                      {
81                          LOG.debug("Deferred content available, scheduling");
82                          return;
83                      }
84                      break;
85                  }
86                  case SCHEDULE:
87                  {
88                      LOG.debug("Deferred content available, queueing");
89                      return;
90                  }
91                  default:
92                  {
93                      throw new IllegalStateException();
94                  }
95              }
96          }
97      }
98  
99      public void send(HttpExchange exchange)
100     {
101         if (!updateState(State.IDLE, State.BEGIN))
102             throw new IllegalStateException();
103 
104         Request request = exchange.getRequest();
105         Throwable cause = request.getAbortCause();
106         if (cause != null)
107         {
108             exchange.abort(cause);
109         }
110         else
111         {
112             LOG.debug("Sending {}", request);
113             RequestNotifier notifier = connection.getDestination().getRequestNotifier();
114             notifier.notifyBegin(request);
115 
116             ContentProvider content = request.getContent();
117             this.contentIterator = content == null ? Collections.<ByteBuffer>emptyIterator() : content.iterator();
118 
119             boolean updated = updateSendState(SendState.IDLE, SendState.EXECUTE);
120             assert updated;
121 
122             // Setting the listener may trigger calls to onContent() by other
123             // threads so we must set it only after the state has been updated
124             if (content instanceof AsyncContentProvider)
125                 ((AsyncContentProvider)content).setListener(this);
126 
127             send();
128         }
129     }
130 
131     private void send()
132     {
133         SendState currentSendState = sendState.get();
134         assert currentSendState != SendState.IDLE : currentSendState;
135 
136         HttpClient client = connection.getHttpClient();
137         ByteBufferPool bufferPool = client.getByteBufferPool();
138         ByteBuffer header = null;
139         ByteBuffer chunk = null;
140         try
141         {
142             HttpExchange exchange = connection.getExchange();
143             // The exchange may be null if it failed concurrently
144             if (exchange == null)
145                 return;
146 
147             final Request request = exchange.getRequest();
148             HttpConversation conversation = exchange.getConversation();
149             HttpGenerator.RequestInfo requestInfo = null;
150 
151             // Determine whether we have already received the 100 Continue response or not
152             // If it was not received yet, we need to save the content and wait for it
153             boolean expect100HeaderPresent = request.getHeaders().contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString());
154             final boolean expecting100ContinueResponse = expect100HeaderPresent && conversation.getAttribute(EXPECT_100_ATTRIBUTE) == null;
155             if (expecting100ContinueResponse)
156                 conversation.setAttribute(EXPECT_100_ATTRIBUTE, Boolean.TRUE);
157 
158             ContentChunk contentChunk = continueContentChunk;
159             continueContentChunk = null;
160             if (contentChunk == null)
161                 contentChunk = new ContentChunk(contentIterator);
162 
163             while (true)
164             {
165                 ByteBuffer content = contentChunk.content;
166                 final ByteBuffer contentBuffer = content == null ? null : content.slice();
167 
168                 HttpGenerator.Result result = generator.generateRequest(requestInfo, header, chunk, content, contentChunk.lastContent);
169                 switch (result)
170                 {
171                     case NEED_INFO:
172                     {
173                         ContentProvider requestContent = request.getContent();
174                         long contentLength = requestContent == null ? -1 : requestContent.getLength();
175                         String path = request.getPath();
176                         String query = request.getQuery();
177                         if (query != null)
178                             path += "?" + query;
179                         requestInfo = new HttpGenerator.RequestInfo(request.getVersion(), request.getHeaders(), contentLength, request.method(), path);
180                         break;
181                     }
182                     case NEED_HEADER:
183                     {
184                         header = bufferPool.acquire(client.getRequestBufferSize(), false);
185                         break;
186                     }
187                     case NEED_CHUNK:
188                     {
189                         chunk = bufferPool.acquire(HttpGenerator.CHUNK_SIZE, false);
190                         break;
191                     }
192                     case FLUSH:
193                     {
194                         out:
195                         while (true)
196                         {
197                             State currentState = state.get();
198                             switch (currentState)
199                             {
200                                 case BEGIN:
201                                 {
202                                     if (!updateState(currentState, State.HEADERS))
203                                         continue;
204                                     RequestNotifier notifier = connection.getDestination().getRequestNotifier();
205                                     notifier.notifyHeaders(request);
206                                     break out;
207                                 }
208                                 case HEADERS:
209                                 case COMMIT:
210                                 {
211                                     // State update is performed after the write in commit()
212                                     break out;
213                                 }
214                                 case FAILURE:
215                                 {
216                                     // Failed concurrently, avoid the write since
217                                     // the connection is probably already closed
218                                     return;
219                                 }
220                                 default:
221                                 {
222                                     throw new IllegalStateException();
223                                 }
224                             }
225                         }
226 
227                         StatefulExecutorCallback callback = new StatefulExecutorCallback(client.getExecutor())
228                         {
229                             @Override
230                             protected void onSucceeded()
231                             {
232                                 LOG.debug("Write succeeded for {}", request);
233 
234                                 if (!processWrite(request, contentBuffer, expecting100ContinueResponse))
235                                     return;
236 
237                                 send();
238                             }
239 
240                             @Override
241                             protected void onFailed(Throwable x)
242                             {
243                                 fail(x);
244                             }
245                         };
246 
247                         if (expecting100ContinueResponse)
248                         {
249                             // Save the content waiting for the 100 Continue response
250                             continueContentChunk = new ContinueContentChunk(contentChunk);
251                         }
252 
253                         write(callback, header, chunk, expecting100ContinueResponse ? null : content);
254 
255                         if (callback.process())
256                         {
257                             LOG.debug("Write pending for {}", request);
258                             return;
259                         }
260 
261                         if (callback.isSucceeded())
262                         {
263                             if (!processWrite(request, contentBuffer, expecting100ContinueResponse))
264                                 return;
265 
266                             // Send further content
267                             contentChunk = new ContentChunk(contentIterator);
268 
269                             if (contentChunk.isDeferred())
270                             {
271                                 out:
272                                 while (true)
273                                 {
274                                     currentSendState = sendState.get();
275                                     switch (currentSendState)
276                                     {
277                                         case EXECUTE:
278                                         {
279                                             if (updateSendState(currentSendState, SendState.IDLE))
280                                             {
281                                                 LOG.debug("Waiting for deferred content for {}", request);
282                                                 return;
283                                             }
284                                             break;
285                                         }
286                                         case SCHEDULE:
287                                         {
288                                             if (updateSendState(currentSendState, SendState.EXECUTE))
289                                             {
290                                                 LOG.debug("Deferred content available for {}", request);
291                                                 break out;
292                                             }
293                                             break;
294                                         }
295                                         default:
296                                         {
297                                             throw new IllegalStateException();
298                                         }
299                                     }
300                                 }
301                             }
302                         }
303                         break;
304                     }
305                     case SHUTDOWN_OUT:
306                     {
307                         shutdownOutput();
308                         break;
309                     }
310                     case CONTINUE:
311                     {
312                         break;
313                     }
314                     case DONE:
315                     {
316                         if (generator.isEnd())
317                         {
318                             out: while (true)
319                             {
320                                 currentSendState = sendState.get();
321                                 switch (currentSendState)
322                                 {
323                                     case EXECUTE:
324                                     case SCHEDULE:
325                                     {
326                                         if (!updateSendState(currentSendState, SendState.IDLE))
327                                             throw new IllegalStateException();
328                                         break out;
329                                     }
330                                     default:
331                                     {
332                                         throw new IllegalStateException();
333                                     }
334                                 }
335                             }
336                             success();
337                         }
338                         return;
339                     }
340                     default:
341                     {
342                         throw new IllegalStateException("Unknown result " + result);
343                     }
344                 }
345             }
346         }
347         catch (Exception x)
348         {
349             LOG.debug(x);
350             fail(x);
351         }
352         finally
353         {
354             releaseBuffers(bufferPool, header, chunk);
355         }
356     }
357 
358     private boolean processWrite(Request request, ByteBuffer content, boolean expecting100ContinueResponse)
359     {
360         if (!commit(request))
361             return false;
362 
363         if (content != null && content.hasRemaining())
364         {
365             RequestNotifier notifier = connection.getDestination().getRequestNotifier();
366             notifier.notifyContent(request, content);
367         }
368 
369         if (expecting100ContinueResponse)
370         {
371             LOG.debug("Expecting 100 Continue for {}", request);
372             continueContentChunk.signal();
373             return false;
374         }
375 
376         return true;
377     }
378 
379     public void proceed(boolean proceed)
380     {
381         ContinueContentChunk contentChunk = continueContentChunk;
382         if (contentChunk != null)
383         {
384             if (proceed)
385             {
386                 // Method send() must not be executed concurrently.
387                 // The write in send() may arrive to the server and the server reply with 100 Continue
388                 // before send() exits; the processing of the 100 Continue will invoke this method
389                 // which in turn invokes send(), with the risk of a concurrent invocation of send().
390                 // Therefore we wait here on the ContinueContentChunk to send, and send() will signal
391                 // when it is ok to proceed.
392                 LOG.debug("Proceeding {}", connection.getExchange());
393                 contentChunk.await();
394                 send();
395             }
396             else
397             {
398                 HttpExchange exchange = connection.getExchange();
399                 if (exchange != null)
400                     fail(new HttpRequestException("Expectation failed", exchange.getRequest()));
401             }
402         }
403     }
404 
405     private void write(Callback callback, ByteBuffer header, ByteBuffer chunk, ByteBuffer content)
406     {
407         int mask = 0;
408         if (header != null)
409             mask += 1;
410         if (chunk != null)
411             mask += 2;
412         if (content != null)
413             mask += 4;
414 
415         EndPoint endPoint = connection.getEndPoint();
416         switch (mask)
417         {
418             case 0:
419                 endPoint.write(callback, BufferUtil.EMPTY_BUFFER);
420                 break;
421             case 1:
422                 endPoint.write(callback, header);
423                 break;
424             case 2:
425                 endPoint.write(callback, chunk);
426                 break;
427             case 3:
428                 endPoint.write(callback, header, chunk);
429                 break;
430             case 4:
431                 endPoint.write(callback, content);
432                 break;
433             case 5:
434                 endPoint.write(callback, header, content);
435                 break;
436             case 6:
437                 endPoint.write(callback, chunk, content);
438                 break;
439             case 7:
440                 endPoint.write(callback, header, chunk, content);
441                 break;
442             default:
443                 throw new IllegalStateException();
444         }
445     }
446 
447     protected boolean commit(Request request)
448     {
449         while (true)
450         {
451             State current = state.get();
452             switch (current)
453             {
454                 case HEADERS:
455                     if (!updateState(current, State.COMMIT))
456                         continue;
457                     LOG.debug("Committed {}", request);
458                     RequestNotifier notifier = connection.getDestination().getRequestNotifier();
459                     notifier.notifyCommit(request);
460                     return true;
461                 case COMMIT:
462                     if (!updateState(current, State.COMMIT))
463                         continue;
464                     return true;
465                 case FAILURE:
466                     return false;
467                 default:
468                     throw new IllegalStateException();
469             }
470         }
471     }
472 
473     protected boolean success()
474     {
475         HttpExchange exchange = connection.getExchange();
476         if (exchange == null)
477             return false;
478 
479         AtomicMarkableReference<Result> completion = exchange.requestComplete(null);
480         if (!completion.isMarked())
481             return false;
482 
483         generator.reset();
484 
485         if (!updateState(State.COMMIT, State.IDLE))
486             throw new IllegalStateException();
487 
488         exchange.terminateRequest();
489 
490         // It is important to notify completion *after* we reset because
491         // the notification may trigger another request/response
492 
493         HttpDestination destination = connection.getDestination();
494         Request request = exchange.getRequest();
495         destination.getRequestNotifier().notifySuccess(request);
496         LOG.debug("Sent {}", request);
497 
498         Result result = completion.getReference();
499         if (result != null)
500         {
501             connection.complete(exchange, !result.isFailed());
502 
503             HttpConversation conversation = exchange.getConversation();
504             destination.getResponseNotifier().notifyComplete(conversation.getResponseListeners(), result);
505         }
506 
507         return true;
508     }
509 
510     protected boolean fail(Throwable failure)
511     {
512         HttpExchange exchange = connection.getExchange();
513         if (exchange == null)
514             return false;
515 
516         AtomicMarkableReference<Result> completion = exchange.requestComplete(failure);
517         if (!completion.isMarked())
518             return false;
519 
520         generator.abort();
521 
522         State current;
523         while (true)
524         {
525             current = state.get();
526             if (updateState(current, State.FAILURE))
527                 break;
528         }
529 
530         shutdownOutput();
531 
532         exchange.terminateRequest();
533 
534         HttpDestination destination = connection.getDestination();
535         Request request = exchange.getRequest();
536         destination.getRequestNotifier().notifyFailure(request, failure);
537         LOG.debug("Failed {} {}", request, failure);
538 
539         Result result = completion.getReference();
540         boolean notCommitted = isBeforeCommit(current);
541         if (result == null && notCommitted && request.getAbortCause() == null)
542         {
543             completion = exchange.responseComplete(failure);
544             if (completion.isMarked())
545             {
546                 result = completion.getReference();
547                 exchange.terminateResponse();
548                 LOG.debug("Failed on behalf {}", exchange);
549             }
550         }
551 
552         if (result != null)
553         {
554             connection.complete(exchange, false);
555 
556             HttpConversation conversation = exchange.getConversation();
557             destination.getResponseNotifier().notifyComplete(conversation.getResponseListeners(), result);
558         }
559 
560         return true;
561     }
562 
563     private void shutdownOutput()
564     {
565         connection.getEndPoint().shutdownOutput();
566     }
567 
568     public boolean abort(Throwable cause)
569     {
570         State current = state.get();
571         boolean abortable = isBeforeCommit(current) ||
572                 current == State.COMMIT && contentIterator.hasNext();
573         return abortable && fail(cause);
574     }
575 
576     private boolean isBeforeCommit(State state)
577     {
578         return state == State.IDLE || state == State.BEGIN || state == State.HEADERS;
579     }
580 
581     private void releaseBuffers(ByteBufferPool bufferPool, ByteBuffer header, ByteBuffer chunk)
582     {
583         if (!BufferUtil.hasContent(header))
584             bufferPool.release(header);
585         if (!BufferUtil.hasContent(chunk))
586             bufferPool.release(chunk);
587     }
588 
589     private boolean updateState(State from, State to)
590     {
591         boolean updated = state.compareAndSet(from, to);
592         if (!updated)
593             LOG.debug("State update failed: {} -> {}: {}", from, to, state.get());
594         return updated;
595     }
596 
597     private boolean updateSendState(SendState from, SendState to)
598     {
599         boolean updated = sendState.compareAndSet(from, to);
600         if (!updated)
601             LOG.debug("Send state update failed: {} -> {}: {}", from, to, sendState.get());
602         return updated;
603     }
604 
605     private enum State
606     {
607         IDLE, BEGIN, HEADERS, COMMIT, FAILURE
608     }
609 
610     private enum SendState
611     {
612         IDLE, EXECUTE, SCHEDULE
613     }
614 
615     private static abstract class StatefulExecutorCallback implements Callback, Runnable
616     {
617         private final AtomicReference<State> state = new AtomicReference<>(State.INCOMPLETE);
618         private final Executor executor;
619 
620         private StatefulExecutorCallback(Executor executor)
621         {
622             this.executor = executor;
623         }
624 
625         @Override
626         public final void succeeded()
627         {
628             State previous = state.get();
629             while (true)
630             {
631                 if (state.compareAndSet(previous, State.SUCCEEDED))
632                     break;
633                 previous = state.get();
634             }
635             if (previous == State.PENDING)
636                 executor.execute(this);
637         }
638 
639         @Override
640         public final void run()
641         {
642             onSucceeded();
643         }
644 
645         protected abstract void onSucceeded();
646 
647         @Override
648         public final void failed(final Throwable x)
649         {
650             State previous = state.get();
651             while (true)
652             {
653                 if (state.compareAndSet(previous, State.FAILED))
654                     break;
655                 previous = state.get();
656             }
657             if (previous == State.PENDING)
658             {
659                 executor.execute(new Runnable()
660                 {
661                     @Override
662                     public void run()
663                     {
664                         onFailed(x);
665                     }
666                 });
667             }
668             else
669             {
670                 onFailed(x);
671             }
672         }
673 
674         protected abstract void onFailed(Throwable x);
675 
676         public boolean process()
677         {
678             return state.compareAndSet(State.INCOMPLETE, State.PENDING);
679         }
680 
681         public boolean isSucceeded()
682         {
683             return state.get() == State.SUCCEEDED;
684         }
685 
686         public boolean isFailed()
687         {
688             return state.get() == State.FAILED;
689         }
690 
691         private enum State
692         {
693             INCOMPLETE, PENDING, SUCCEEDED, FAILED
694         }
695     }
696 
697     private class ContentChunk
698     {
699         private final boolean lastContent;
700         private final ByteBuffer content;
701 
702         private ContentChunk(ContentChunk chunk)
703         {
704             lastContent = chunk.lastContent;
705             content = chunk.content;
706         }
707 
708         private ContentChunk(Iterator<ByteBuffer> contentIterator)
709         {
710             lastContent = !contentIterator.hasNext();
711             content = lastContent ? BufferUtil.EMPTY_BUFFER : contentIterator.next();
712         }
713 
714         private boolean isDeferred()
715         {
716             return content == null && !lastContent;
717         }
718     }
719 
720     private class ContinueContentChunk extends ContentChunk
721     {
722         private final CountDownLatch latch = new CountDownLatch(1);
723 
724         private ContinueContentChunk(ContentChunk chunk)
725         {
726             super(chunk);
727         }
728 
729         private void signal()
730         {
731             latch.countDown();
732         }
733 
734         private void await()
735         {
736             try
737             {
738                 latch.await();
739             }
740             catch (InterruptedException x)
741             {
742                 LOG.ignore(x);
743             }
744         }
745     }
746 }