View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.nio.ByteBuffer;
22  import java.util.Collections;
23  import java.util.Iterator;
24  import java.util.concurrent.CountDownLatch;
25  import java.util.concurrent.Executor;
26  import java.util.concurrent.atomic.AtomicMarkableReference;
27  import java.util.concurrent.atomic.AtomicReference;
28  
29  import org.eclipse.jetty.client.api.ContentProvider;
30  import org.eclipse.jetty.client.api.Request;
31  import org.eclipse.jetty.client.api.Result;
32  import org.eclipse.jetty.http.HttpGenerator;
33  import org.eclipse.jetty.http.HttpHeader;
34  import org.eclipse.jetty.http.HttpHeaderValue;
35  import org.eclipse.jetty.io.ByteBufferPool;
36  import org.eclipse.jetty.io.EndPoint;
37  import org.eclipse.jetty.util.BufferUtil;
38  import org.eclipse.jetty.util.Callback;
39  import org.eclipse.jetty.util.log.Log;
40  import org.eclipse.jetty.util.log.Logger;
41  
42  public class HttpSender implements AsyncContentProvider.Listener
43  {
44      private static final Logger LOG = Log.getLogger(HttpSender.class);
45      private static final String EXPECT_100_ATTRIBUTE = HttpSender.class.getName() + ".expect100";
46  
47      private final AtomicReference<State> state = new AtomicReference<>(State.IDLE);
48      private final AtomicReference<SendState> sendState = new AtomicReference<>(SendState.IDLE);
49      private final HttpGenerator generator = new HttpGenerator();
50      private final HttpConnection connection;
51      private Iterator<ByteBuffer> contentIterator;
52      private ContinueContentChunk continueContentChunk;
53  
54      public HttpSender(HttpConnection connection)
55      {
56          this.connection = connection;
57      }
58  
59      @Override
60      public void onContent()
61      {
62          while (true)
63          {
64              SendState current = sendState.get();
65              switch (current)
66              {
67                  case IDLE:
68                  {
69                      if (updateSendState(current, SendState.EXECUTE))
70                      {
71                          LOG.debug("Deferred content available, sending");
72                          send();
73                          return;
74                      }
75                      break;
76                  }
77                  case EXECUTE:
78                  {
79                      if (updateSendState(current, SendState.SCHEDULE))
80                      {
81                          LOG.debug("Deferred content available, scheduling");
82                          return;
83                      }
84                      break;
85                  }
86                  case SCHEDULE:
87                  {
88                      LOG.debug("Deferred content available, queueing");
89                      return;
90                  }
91                  default:
92                  {
93                      throw new IllegalStateException();
94                  }
95              }
96          }
97      }
98  
99      public void send(HttpExchange exchange)
100     {
101         if (!updateState(State.IDLE, State.BEGIN))
102             throw new IllegalStateException();
103 
104         Request request = exchange.getRequest();
105         Throwable cause = request.getAbortCause();
106         if (cause != null)
107         {
108             exchange.abort(cause);
109         }
110         else
111         {
112             LOG.debug("Sending {}", request);
113             RequestNotifier notifier = connection.getDestination().getRequestNotifier();
114             notifier.notifyBegin(request);
115 
116             ContentProvider content = request.getContent();
117             this.contentIterator = content == null ? Collections.<ByteBuffer>emptyIterator() : content.iterator();
118 
119             boolean updated = updateSendState(SendState.IDLE, SendState.EXECUTE);
120             assert updated;
121 
122             // Setting the listener may trigger calls to onContent() by other
123             // threads so we must set it only after the state has been updated
124             if (content instanceof AsyncContentProvider)
125                 ((AsyncContentProvider)content).setListener(this);
126 
127             send();
128         }
129     }
130 
131     private void send()
132     {
133         SendState currentSendState = sendState.get();
134         assert currentSendState != SendState.IDLE : currentSendState;
135 
136         HttpClient client = connection.getHttpClient();
137         ByteBufferPool bufferPool = client.getByteBufferPool();
138         ByteBuffer header = null;
139         ByteBuffer chunk = null;
140         try
141         {
142             HttpExchange exchange = connection.getExchange();
143             // The exchange may be null if it failed concurrently
144             if (exchange == null)
145                 return;
146 
147             final Request request = exchange.getRequest();
148             HttpConversation conversation = exchange.getConversation();
149             HttpGenerator.RequestInfo requestInfo = null;
150 
151             // Determine whether we have already received the 100 Continue response or not
152             // If it was not received yet, we need to save the content and wait for it
153             boolean expect100HeaderPresent = request.getHeaders().contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString());
154             final boolean expecting100ContinueResponse = expect100HeaderPresent && conversation.getAttribute(EXPECT_100_ATTRIBUTE) == null;
155             if (expecting100ContinueResponse)
156                 conversation.setAttribute(EXPECT_100_ATTRIBUTE, Boolean.TRUE);
157 
158             ContentChunk contentChunk = continueContentChunk;
159             continueContentChunk = null;
160             if (contentChunk == null)
161                 contentChunk = new ContentChunk(contentIterator);
162 
163             while (true)
164             {
165                 ByteBuffer content = contentChunk.content;
166                 final ByteBuffer contentBuffer = content == null ? null : content.slice();
167 
168                 HttpGenerator.Result result = generator.generateRequest(requestInfo, header, chunk, content, contentChunk.lastContent);
169                 switch (result)
170                 {
171                     case NEED_INFO:
172                     {
173                         ContentProvider requestContent = request.getContent();
174                         long contentLength = requestContent == null ? -1 : requestContent.getLength();
175                         requestInfo = new HttpGenerator.RequestInfo(request.getVersion(), request.getHeaders(), contentLength, request.getMethod().asString(), request.getPath());
176                         break;
177                     }
178                     case NEED_HEADER:
179                     {
180                         header = bufferPool.acquire(client.getRequestBufferSize(), false);
181                         break;
182                     }
183                     case NEED_CHUNK:
184                     {
185                         chunk = bufferPool.acquire(HttpGenerator.CHUNK_SIZE, false);
186                         break;
187                     }
188                     case FLUSH:
189                     {
190                         out:
191                         while (true)
192                         {
193                             State currentState = state.get();
194                             switch (currentState)
195                             {
196                                 case BEGIN:
197                                 {
198                                     if (!updateState(currentState, State.HEADERS))
199                                         continue;
200                                     RequestNotifier notifier = connection.getDestination().getRequestNotifier();
201                                     notifier.notifyHeaders(request);
202                                     break out;
203                                 }
204                                 case HEADERS:
205                                 case COMMIT:
206                                 {
207                                     // State update is performed after the write in commit()
208                                     break out;
209                                 }
210                                 case FAILURE:
211                                 {
212                                     // Failed concurrently, avoid the write since
213                                     // the connection is probably already closed
214                                     return;
215                                 }
216                                 default:
217                                 {
218                                     throw new IllegalStateException();
219                                 }
220                             }
221                         }
222 
223                         StatefulExecutorCallback callback = new StatefulExecutorCallback(client.getExecutor())
224                         {
225                             @Override
226                             protected void onSucceeded()
227                             {
228                                 LOG.debug("Write succeeded for {}", request);
229 
230                                 if (!processWrite(request, contentBuffer, expecting100ContinueResponse))
231                                     return;
232 
233                                 send();
234                             }
235 
236                             @Override
237                             protected void onFailed(Throwable x)
238                             {
239                                 fail(x);
240                             }
241                         };
242 
243                         if (expecting100ContinueResponse)
244                         {
245                             // Save the content waiting for the 100 Continue response
246                             continueContentChunk = new ContinueContentChunk(contentChunk);
247                         }
248 
249                         write(callback, header, chunk, expecting100ContinueResponse ? null : content);
250 
251                         if (callback.process())
252                         {
253                             LOG.debug("Write pending for {}", request);
254                             return;
255                         }
256 
257                         if (callback.isSucceeded())
258                         {
259                             if (!processWrite(request, contentBuffer, expecting100ContinueResponse))
260                                 return;
261 
262                             // Send further content
263                             contentChunk = new ContentChunk(contentIterator);
264 
265                             if (contentChunk.isDeferred())
266                             {
267                                 out:
268                                 while (true)
269                                 {
270                                     currentSendState = sendState.get();
271                                     switch (currentSendState)
272                                     {
273                                         case EXECUTE:
274                                         {
275                                             if (updateSendState(currentSendState, SendState.IDLE))
276                                             {
277                                                 LOG.debug("Waiting for deferred content for {}", request);
278                                                 return;
279                                             }
280                                             break;
281                                         }
282                                         case SCHEDULE:
283                                         {
284                                             if (updateSendState(currentSendState, SendState.EXECUTE))
285                                             {
286                                                 LOG.debug("Deferred content available for {}", request);
287                                                 break out;
288                                             }
289                                             break;
290                                         }
291                                         default:
292                                         {
293                                             throw new IllegalStateException();
294                                         }
295                                     }
296                                 }
297                             }
298                         }
299                         break;
300                     }
301                     case SHUTDOWN_OUT:
302                     {
303                         EndPoint endPoint = connection.getEndPoint();
304                         endPoint.shutdownOutput();
305                         break;
306                     }
307                     case CONTINUE:
308                     {
309                         break;
310                     }
311                     case DONE:
312                     {
313                         if (generator.isEnd())
314                         {
315                             out: while (true)
316                             {
317                                 currentSendState = sendState.get();
318                                 switch (currentSendState)
319                                 {
320                                     case EXECUTE:
321                                     case SCHEDULE:
322                                     {
323                                         if (!updateSendState(currentSendState, SendState.IDLE))
324                                             throw new IllegalStateException();
325                                         break out;
326                                     }
327                                     default:
328                                     {
329                                         throw new IllegalStateException();
330                                     }
331                                 }
332                             }
333                             success();
334                         }
335                         return;
336                     }
337                     default:
338                     {
339                         throw new IllegalStateException("Unknown result " + result);
340                     }
341                 }
342             }
343         }
344         catch (Exception x)
345         {
346             LOG.debug(x);
347             fail(x);
348         }
349         finally
350         {
351             releaseBuffers(bufferPool, header, chunk);
352         }
353     }
354 
355     private boolean processWrite(Request request, ByteBuffer content, boolean expecting100ContinueResponse)
356     {
357         if (!commit(request))
358             return false;
359 
360         if (content != null)
361         {
362             RequestNotifier notifier = connection.getDestination().getRequestNotifier();
363             notifier.notifyContent(request, content);
364         }
365 
366         if (expecting100ContinueResponse)
367         {
368             LOG.debug("Expecting 100 Continue for {}", request);
369             continueContentChunk.signal();
370             return false;
371         }
372 
373         return true;
374     }
375 
376     public void proceed(boolean proceed)
377     {
378         ContinueContentChunk contentChunk = continueContentChunk;
379         if (contentChunk != null)
380         {
381             if (proceed)
382             {
383                 // Method send() must not be executed concurrently.
384                 // The write in send() may arrive to the server and the server reply with 100 Continue
385                 // before send() exits; the processing of the 100 Continue will invoke this method
386                 // which in turn invokes send(), with the risk of a concurrent invocation of send().
387                 // Therefore we wait here on the ContinueContentChunk to send, and send() will signal
388                 // when it is ok to proceed.
389                 LOG.debug("Proceeding {}", connection.getExchange());
390                 contentChunk.await();
391                 send();
392             }
393             else
394             {
395                 HttpExchange exchange = connection.getExchange();
396                 if (exchange != null)
397                     fail(new HttpRequestException("Expectation failed", exchange.getRequest()));
398             }
399         }
400     }
401 
402     private void write(Callback callback, ByteBuffer header, ByteBuffer chunk, ByteBuffer content)
403     {
404         int mask = 0;
405         if (header != null)
406             mask += 1;
407         if (chunk != null)
408             mask += 2;
409         if (content != null)
410             mask += 4;
411 
412         EndPoint endPoint = connection.getEndPoint();
413         switch (mask)
414         {
415             case 0:
416                 endPoint.write(callback, BufferUtil.EMPTY_BUFFER);
417                 break;
418             case 1:
419                 endPoint.write(callback, header);
420                 break;
421             case 2:
422                 endPoint.write(callback, chunk);
423                 break;
424             case 3:
425                 endPoint.write(callback, header, chunk);
426                 break;
427             case 4:
428                 endPoint.write(callback, content);
429                 break;
430             case 5:
431                 endPoint.write(callback, header, content);
432                 break;
433             case 6:
434                 endPoint.write(callback, chunk, content);
435                 break;
436             case 7:
437                 endPoint.write(callback, header, chunk, content);
438                 break;
439             default:
440                 throw new IllegalStateException();
441         }
442     }
443 
444     protected boolean commit(Request request)
445     {
446         while (true)
447         {
448             State current = state.get();
449             switch (current)
450             {
451                 case HEADERS:
452                     if (!updateState(current, State.COMMIT))
453                         continue;
454                     LOG.debug("Committed {}", request);
455                     RequestNotifier notifier = connection.getDestination().getRequestNotifier();
456                     notifier.notifyCommit(request);
457                     return true;
458                 case COMMIT:
459                     if (!updateState(current, State.COMMIT))
460                         continue;
461                     return true;
462                 case FAILURE:
463                     return false;
464                 default:
465                     throw new IllegalStateException();
466             }
467         }
468     }
469 
470     protected boolean success()
471     {
472         HttpExchange exchange = connection.getExchange();
473         if (exchange == null)
474             return false;
475 
476         AtomicMarkableReference<Result> completion = exchange.requestComplete(null);
477         if (!completion.isMarked())
478             return false;
479 
480         generator.reset();
481 
482         if (!updateState(State.COMMIT, State.IDLE))
483             throw new IllegalStateException();
484 
485         exchange.terminateRequest();
486 
487         // It is important to notify completion *after* we reset because
488         // the notification may trigger another request/response
489 
490         HttpDestination destination = connection.getDestination();
491         Request request = exchange.getRequest();
492         destination.getRequestNotifier().notifySuccess(request);
493         LOG.debug("Sent {}", request);
494 
495         Result result = completion.getReference();
496         if (result != null)
497         {
498             connection.complete(exchange, !result.isFailed());
499 
500             HttpConversation conversation = exchange.getConversation();
501             destination.getResponseNotifier().notifyComplete(conversation.getResponseListeners(), result);
502         }
503 
504         return true;
505     }
506 
507     protected boolean fail(Throwable failure)
508     {
509         HttpExchange exchange = connection.getExchange();
510         if (exchange == null)
511             return false;
512 
513         AtomicMarkableReference<Result> completion = exchange.requestComplete(failure);
514         if (!completion.isMarked())
515             return false;
516 
517         generator.abort();
518 
519         State current;
520         while (true)
521         {
522             current = state.get();
523             if (updateState(current, State.FAILURE))
524                 break;
525         }
526 
527         exchange.terminateRequest();
528 
529         HttpDestination destination = connection.getDestination();
530         Request request = exchange.getRequest();
531         destination.getRequestNotifier().notifyFailure(request, failure);
532         LOG.debug("Failed {} {}", request, failure);
533 
534         Result result = completion.getReference();
535         boolean notCommitted = isBeforeCommit(current);
536         if (result == null && notCommitted && request.getAbortCause() == null)
537         {
538             result = exchange.responseComplete(failure).getReference();
539             exchange.terminateResponse();
540             LOG.debug("Failed on behalf {}", exchange);
541         }
542 
543         if (result != null)
544         {
545             connection.complete(exchange, false);
546 
547             HttpConversation conversation = exchange.getConversation();
548             destination.getResponseNotifier().notifyComplete(conversation.getResponseListeners(), result);
549         }
550 
551         return true;
552     }
553 
554     public boolean abort(Throwable cause)
555     {
556         State current = state.get();
557         boolean abortable = isBeforeCommit(current) ||
558                 current == State.COMMIT && contentIterator.hasNext();
559         return abortable && fail(cause);
560     }
561 
562     private boolean isBeforeCommit(State state)
563     {
564         return state == State.IDLE || state == State.BEGIN || state == State.HEADERS;
565     }
566 
567     private void releaseBuffers(ByteBufferPool bufferPool, ByteBuffer header, ByteBuffer chunk)
568     {
569         if (!BufferUtil.hasContent(header))
570             bufferPool.release(header);
571         if (!BufferUtil.hasContent(chunk))
572             bufferPool.release(chunk);
573     }
574 
575     private boolean updateState(State from, State to)
576     {
577         boolean updated = state.compareAndSet(from, to);
578         if (!updated)
579             LOG.debug("State update failed: {} -> {}: {}", from, to, state.get());
580         return updated;
581     }
582 
583     private boolean updateSendState(SendState from, SendState to)
584     {
585         boolean updated = sendState.compareAndSet(from, to);
586         if (!updated)
587             LOG.debug("Send state update failed: {} -> {}: {}", from, to, sendState.get());
588         return updated;
589     }
590 
591     private enum State
592     {
593         IDLE, BEGIN, HEADERS, COMMIT, FAILURE
594     }
595 
596     private enum SendState
597     {
598         IDLE, EXECUTE, SCHEDULE
599     }
600 
601     private static abstract class StatefulExecutorCallback implements Callback, Runnable
602     {
603         private final AtomicReference<State> state = new AtomicReference<>(State.INCOMPLETE);
604         private final Executor executor;
605 
606         private StatefulExecutorCallback(Executor executor)
607         {
608             this.executor = executor;
609         }
610 
611         @Override
612         public final void succeeded()
613         {
614             State previous = state.get();
615             while (true)
616             {
617                 if (state.compareAndSet(previous, State.SUCCEEDED))
618                     break;
619                 previous = state.get();
620             }
621             if (previous == State.PENDING)
622                 executor.execute(this);
623         }
624 
625         @Override
626         public final void run()
627         {
628             onSucceeded();
629         }
630 
631         protected abstract void onSucceeded();
632 
633         @Override
634         public final void failed(final Throwable x)
635         {
636             State previous = state.get();
637             while (true)
638             {
639                 if (state.compareAndSet(previous, State.FAILED))
640                     break;
641                 previous = state.get();
642             }
643             if (previous == State.PENDING)
644             {
645                 executor.execute(new Runnable()
646                 {
647                     @Override
648                     public void run()
649                     {
650                         onFailed(x);
651                     }
652                 });
653             }
654             else
655             {
656                 onFailed(x);
657             }
658         }
659 
660         protected abstract void onFailed(Throwable x);
661 
662         public boolean process()
663         {
664             return state.compareAndSet(State.INCOMPLETE, State.PENDING);
665         }
666 
667         public boolean isSucceeded()
668         {
669             return state.get() == State.SUCCEEDED;
670         }
671 
672         public boolean isFailed()
673         {
674             return state.get() == State.FAILED;
675         }
676 
677         private enum State
678         {
679             INCOMPLETE, PENDING, SUCCEEDED, FAILED
680         }
681     }
682 
683     private class ContentChunk
684     {
685         private final boolean lastContent;
686         private final ByteBuffer content;
687 
688         private ContentChunk(ContentChunk chunk)
689         {
690             lastContent = chunk.lastContent;
691             content = chunk.content;
692         }
693 
694         private ContentChunk(Iterator<ByteBuffer> contentIterator)
695         {
696             lastContent = !contentIterator.hasNext();
697             content = lastContent ? BufferUtil.EMPTY_BUFFER : contentIterator.next();
698         }
699 
700         private boolean isDeferred()
701         {
702             return content == null && !lastContent;
703         }
704     }
705 
706     private class ContinueContentChunk extends ContentChunk
707     {
708         private final CountDownLatch latch = new CountDownLatch(1);
709 
710         private ContinueContentChunk(ContentChunk chunk)
711         {
712             super(chunk);
713         }
714 
715         private void signal()
716         {
717             latch.countDown();
718         }
719 
720         private void await()
721         {
722             try
723             {
724                 latch.await();
725             }
726             catch (InterruptedException x)
727             {
728                 LOG.ignore(x);
729             }
730         }
731     }
732 }