View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.io.IOException;
22  import java.net.URI;
23  import java.nio.ByteBuffer;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.Enumeration;
27  import java.util.HashMap;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.concurrent.atomic.AtomicReference;
31  
32  import org.eclipse.jetty.client.api.Response;
33  import org.eclipse.jetty.client.api.Result;
34  import org.eclipse.jetty.http.HttpField;
35  import org.eclipse.jetty.http.HttpHeader;
36  import org.eclipse.jetty.http.HttpStatus;
37  import org.eclipse.jetty.util.BufferUtil;
38  import org.eclipse.jetty.util.Callback;
39  import org.eclipse.jetty.util.CountingCallback;
40  import org.eclipse.jetty.util.log.Log;
41  import org.eclipse.jetty.util.log.Logger;
42  
43  /**
44   * {@link HttpReceiver} provides the abstract code to implement the various steps of the receive of HTTP responses.
45   * <p>
46   * {@link HttpReceiver} maintains a state machine that is updated when the steps of receiving a response are executed.
47   * <p>
48   * Subclasses must handle the transport-specific details, for example how to read from the raw socket and how to parse
49   * the bytes read from the socket. Then they have to call the methods defined in this class in the following order:
50   * <ol>
51   * <li>{@link #responseBegin(HttpExchange)}, when the HTTP response data containing the HTTP status code
52   * is available</li>
53   * <li>{@link #responseHeader(HttpExchange, HttpField)}, when a HTTP field is available</li>
54   * <li>{@link #responseHeaders(HttpExchange)}, when all HTTP headers are available</li>
55   * <li>{@link #responseContent(HttpExchange, ByteBuffer, Callback)}, when HTTP content is available</li>
56   * <li>{@link #responseSuccess(HttpExchange)}, when the response is successful</li>
57   * </ol>
58   * At any time, subclasses may invoke {@link #responseFailure(Throwable)} to indicate that the response has failed
59   * (for example, because of I/O exceptions).
60   * At any time, user threads may abort the response which will cause {@link #responseFailure(Throwable)} to be
61   * invoked.
62   * <p>
63   * The state machine maintained by this class ensures that the response steps are not executed by an I/O thread
64   * if the response has already been failed.
65   *
66   * @see HttpSender
67   */
68  public abstract class HttpReceiver
69  {
70      protected static final Logger LOG = Log.getLogger(HttpReceiver.class);
71  
72      private final AtomicReference<ResponseState> responseState = new AtomicReference<>(ResponseState.IDLE);
73      private final HttpChannel channel;
74      private ContentDecoder decoder;
75      private Throwable failure;
76  
77      protected HttpReceiver(HttpChannel channel)
78      {
79          this.channel = channel;
80      }
81  
82      protected HttpChannel getHttpChannel()
83      {
84          return channel;
85      }
86  
87      protected HttpExchange getHttpExchange()
88      {
89          return channel.getHttpExchange();
90      }
91  
92      protected HttpDestination getHttpDestination()
93      {
94          return channel.getHttpDestination();
95      }
96  
97      /**
98       * Method to be invoked when the response status code is available.
99       * <p>
100      * Subclasses must have set the response status code on the {@link Response} object of the {@link HttpExchange}
101      * prior invoking this method.
102      * <p>
103      * This method takes case of notifying {@link org.eclipse.jetty.client.api.Response.BeginListener}s.
104      *
105      * @param exchange the HTTP exchange
106      * @return whether the processing should continue
107      */
108     protected boolean responseBegin(HttpExchange exchange)
109     {
110         if (!updateResponseState(ResponseState.IDLE, ResponseState.TRANSIENT))
111             return false;
112 
113         HttpConversation conversation = exchange.getConversation();
114         HttpResponse response = exchange.getResponse();
115         // Probe the protocol handlers
116         HttpDestination destination = getHttpDestination();
117         HttpClient client = destination.getHttpClient();
118         ProtocolHandler protocolHandler = client.findProtocolHandler(exchange.getRequest(), response);
119         Response.Listener handlerListener = null;
120         if (protocolHandler != null)
121         {
122             handlerListener = protocolHandler.getResponseListener();
123             if (LOG.isDebugEnabled())
124                 LOG.debug("Found protocol handler {}", protocolHandler);
125         }
126         exchange.getConversation().updateResponseListeners(handlerListener);
127 
128         if (LOG.isDebugEnabled())
129             LOG.debug("Response begin {}", response);
130         ResponseNotifier notifier = destination.getResponseNotifier();
131         notifier.notifyBegin(conversation.getResponseListeners(), response);
132 
133         if (updateResponseState(ResponseState.TRANSIENT, ResponseState.BEGIN))
134             return true;
135 
136         terminateResponse(exchange);
137         return false;
138     }
139 
140     /**
141      * Method to be invoked when a response HTTP header is available.
142      * <p>
143      * Subclasses must not have added the header to the {@link Response} object of the {@link HttpExchange}
144      * prior invoking this method.
145      * <p>
146      * This method takes case of notifying {@link org.eclipse.jetty.client.api.Response.HeaderListener}s and storing cookies.
147      *
148      * @param exchange the HTTP exchange
149      * @param field the response HTTP field
150      * @return whether the processing should continue
151      */
152     protected boolean responseHeader(HttpExchange exchange, HttpField field)
153     {
154         out: while (true)
155         {
156             ResponseState current = responseState.get();
157             switch (current)
158             {
159                 case BEGIN:
160                 case HEADER:
161                 {
162                     if (updateResponseState(current, ResponseState.TRANSIENT))
163                         break out;
164                     break;
165                 }
166                 default:
167                 {
168                     return false;
169                 }
170             }
171         }
172 
173         HttpResponse response = exchange.getResponse();
174         ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
175         boolean process = notifier.notifyHeader(exchange.getConversation().getResponseListeners(), response, field);
176         if (process)
177         {
178             response.getHeaders().add(field);
179             HttpHeader fieldHeader = field.getHeader();
180             if (fieldHeader != null)
181             {
182                 switch (fieldHeader)
183                 {
184                     case SET_COOKIE:
185                     case SET_COOKIE2:
186                     {
187                         URI uri = exchange.getRequest().getURI();
188                         if (uri != null)
189                             storeCookie(uri, field);
190                         break;
191                     }
192                     default:
193                     {
194                         break;
195                     }
196                 }
197             }
198         }
199 
200         if (updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADER))
201             return true;
202 
203         terminateResponse(exchange);
204         return false;
205     }
206 
207     protected void storeCookie(URI uri, HttpField field)
208     {
209         try
210         {
211             String value = field.getValue();
212             if (value != null)
213             {
214                 Map<String, List<String>> header = new HashMap<>(1);
215                 header.put(field.getHeader().asString(), Collections.singletonList(value));
216                 getHttpDestination().getHttpClient().getCookieManager().put(uri, header);
217             }
218         }
219         catch (IOException x)
220         {
221             if (LOG.isDebugEnabled())
222                 LOG.debug(x);
223         }
224     }
225 
226     /**
227      * Method to be invoked after all response HTTP headers are available.
228      * <p>
229      * This method takes case of notifying {@link org.eclipse.jetty.client.api.Response.HeadersListener}s.
230      *
231      * @param exchange the HTTP exchange
232      * @return whether the processing should continue
233      */
234     protected boolean responseHeaders(HttpExchange exchange)
235     {
236         out: while (true)
237         {
238             ResponseState current = responseState.get();
239             switch (current)
240             {
241                 case BEGIN:
242                 case HEADER:
243                 {
244                     if (updateResponseState(current, ResponseState.TRANSIENT))
245                         break out;
246                     break;
247                 }
248                 default:
249                 {
250                     return false;
251                 }
252             }
253         }
254 
255         HttpResponse response = exchange.getResponse();
256         if (LOG.isDebugEnabled())
257             LOG.debug("Response headers {}{}{}", response, System.lineSeparator(), response.getHeaders().toString().trim());
258         ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
259         notifier.notifyHeaders(exchange.getConversation().getResponseListeners(), response);
260 
261         Enumeration<String> contentEncodings = response.getHeaders().getValues(HttpHeader.CONTENT_ENCODING.asString(), ",");
262         if (contentEncodings != null)
263         {
264             for (ContentDecoder.Factory factory : getHttpDestination().getHttpClient().getContentDecoderFactories())
265             {
266                 while (contentEncodings.hasMoreElements())
267                 {
268                     if (factory.getEncoding().equalsIgnoreCase(contentEncodings.nextElement()))
269                     {
270                         this.decoder = factory.newContentDecoder();
271                         break;
272                     }
273                 }
274             }
275         }
276 
277         if (updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADERS))
278             return true;
279 
280         terminateResponse(exchange);
281         return false;
282     }
283 
284     /**
285      * Method to be invoked when response HTTP content is available.
286      * <p>
287      * This method takes case of decoding the content, if necessary, and notifying {@link org.eclipse.jetty.client.api.Response.ContentListener}s.
288      *
289      * @param exchange the HTTP exchange
290      * @param buffer the response HTTP content buffer
291      * @param callback the callback
292      * @return whether the processing should continue
293      */
294     protected boolean responseContent(HttpExchange exchange, ByteBuffer buffer, final Callback callback)
295     {
296         out: while (true)
297         {
298             ResponseState current = responseState.get();
299             switch (current)
300             {
301                 case HEADERS:
302                 case CONTENT:
303                 {
304                     if (updateResponseState(current, ResponseState.TRANSIENT))
305                         break out;
306                     break;
307                 }
308                 default:
309                 {
310                     return false;
311                 }
312             }
313         }
314 
315         HttpResponse response = exchange.getResponse();
316         if (LOG.isDebugEnabled())
317             LOG.debug("Response content {}{}{}", response, System.lineSeparator(), BufferUtil.toDetailString(buffer));
318 
319         ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
320         List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
321 
322         ContentDecoder decoder = this.decoder;
323         if (decoder == null)
324         {
325             notifier.notifyContent(listeners, response, buffer, callback);
326         }
327         else
328         {
329             try
330             {
331                 List<ByteBuffer> decodeds = new ArrayList<>(2);
332                 while (buffer.hasRemaining())
333                 {
334                     ByteBuffer decoded = decoder.decode(buffer);
335                     if (!decoded.hasRemaining())
336                         continue;
337                     decodeds.add(decoded);
338                     if (LOG.isDebugEnabled())
339                         LOG.debug("Response content decoded ({}) {}{}{}", decoder, response, System.lineSeparator(), BufferUtil.toDetailString(decoded));
340                 }
341 
342                 if (decodeds.isEmpty())
343                 {
344                     callback.succeeded();
345                 }
346                 else
347                 {
348                     int size = decodeds.size();
349                     CountingCallback counter = new CountingCallback(callback, size);
350                     for (int i = 0; i < size; ++i)
351                         notifier.notifyContent(listeners, response, decodeds.get(i), counter);
352                 }
353             }
354             catch (Throwable x)
355             {
356                 callback.failed(x);
357             }
358         }
359 
360         if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
361             return true;
362 
363         terminateResponse(exchange);
364         return false;
365     }
366 
367     /**
368      * Method to be invoked when the response is successful.
369      * <p>
370      * This method takes case of notifying {@link org.eclipse.jetty.client.api.Response.SuccessListener}s and possibly
371      * {@link org.eclipse.jetty.client.api.Response.CompleteListener}s (if the exchange is completed).
372      *
373      * @param exchange the HTTP exchange
374      * @return whether the response was processed as successful
375      */
376     protected boolean responseSuccess(HttpExchange exchange)
377     {
378         // Mark atomically the response as completed, with respect
379         // to concurrency between response success and response failure.
380         if (!exchange.responseComplete(null))
381             return false;
382 
383         responseState.set(ResponseState.IDLE);
384 
385         // Reset to be ready for another response.
386         reset();
387 
388         HttpResponse response = exchange.getResponse();
389         if (LOG.isDebugEnabled())
390             LOG.debug("Response success {}", response);
391         List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
392         ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
393         notifier.notifySuccess(listeners, response);
394 
395         // Special case for 100 Continue that cannot
396         // be handled by the ContinueProtocolHandler.
397         if (exchange.getResponse().getStatus() == HttpStatus.CONTINUE_100)
398             return true;
399 
400         // Mark atomically the response as terminated, with
401         // respect to concurrency between request and response.
402         Result result = exchange.terminateResponse();
403         terminateResponse(exchange, result);
404 
405         return true;
406     }
407 
408     /**
409      * Method to be invoked when the response is failed.
410      * <p>
411      * This method takes care of notifying {@link org.eclipse.jetty.client.api.Response.FailureListener}s.
412      *
413      * @param failure the response failure
414      * @return whether the response was processed as failed
415      */
416     protected boolean responseFailure(Throwable failure)
417     {
418         HttpExchange exchange = getHttpExchange();
419         // In case of a response error, the failure has already been notified
420         // and it is possible that a further attempt to read in the receive
421         // loop throws an exception that reenters here but without exchange;
422         // or, the server could just have timed out the connection.
423         if (exchange == null)
424             return false;
425 
426         // Mark atomically the response as completed, with respect
427         // to concurrency between response success and response failure.
428         if (exchange.responseComplete(failure))
429             return abort(exchange, failure);
430 
431         return false;
432     }
433 
434     private void terminateResponse(HttpExchange exchange)
435     {
436         Result result = exchange.terminateResponse();
437         terminateResponse(exchange, result);
438     }
439 
440     private void terminateResponse(HttpExchange exchange, Result result)
441     {
442         HttpResponse response = exchange.getResponse();
443 
444         if (LOG.isDebugEnabled())
445             LOG.debug("Response complete {}", response);
446 
447         if (result != null)
448         {
449             boolean ordered = getHttpDestination().getHttpClient().isStrictEventOrdering();
450             if (!ordered)
451                 channel.exchangeTerminated(exchange, result);
452             if (LOG.isDebugEnabled())
453                 LOG.debug("Request/Response {}: {}", failure == null ? "succeeded" : "failed", result);
454             List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
455             ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
456             notifier.notifyComplete(listeners, result);
457             if (ordered)
458                 channel.exchangeTerminated(exchange, result);
459         }
460     }
461 
462     /**
463      * Resets this {@link HttpReceiver} state.
464      * <p>
465      * Subclasses should override (but remember to call {@code super}) to reset their own state.
466      * <p>
467      * Either this method or {@link #dispose()} is called.
468      */
469     protected void reset()
470     {
471         decoder = null;
472     }
473 
474     /**
475      * Disposes this {@link HttpReceiver} state.
476      * <p>
477      * Subclasses should override (but remember to call {@code super}) to dispose their own state.
478      * <p>
479      * Either this method or {@link #reset()} is called.
480      */
481     protected void dispose()
482     {
483         decoder = null;
484     }
485 
486     public boolean abort(HttpExchange exchange, Throwable failure)
487     {
488         // Update the state to avoid more response processing.
489         boolean terminate;
490         out: while (true)
491         {
492             ResponseState current = responseState.get();
493             switch (current)
494             {
495                 case FAILURE:
496                 {
497                     return false;
498                 }
499                 default:
500                 {
501                     if (updateResponseState(current, ResponseState.FAILURE))
502                     {
503                         terminate = current != ResponseState.TRANSIENT;
504                         break out;
505                     }
506                     break;
507                 }
508             }
509         }
510 
511         this.failure = failure;
512 
513         dispose();
514 
515         HttpResponse response = exchange.getResponse();
516         if (LOG.isDebugEnabled())
517             LOG.debug("Response failure {} {} on {}: {}", response, exchange, getHttpChannel(), failure);
518         List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
519         ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
520         notifier.notifyFailure(listeners, response, failure);
521 
522         if (terminate)
523         {
524             // Mark atomically the response as terminated, with
525             // respect to concurrency between request and response.
526             Result result = exchange.terminateResponse();
527             terminateResponse(exchange, result);
528         }
529         else
530         {
531             if (LOG.isDebugEnabled())
532                 LOG.debug("Concurrent failure: response termination skipped, performed by helpers");
533         }
534 
535         return true;
536     }
537 
538     private boolean updateResponseState(ResponseState from, ResponseState to)
539     {
540         boolean updated = responseState.compareAndSet(from, to);
541         if (!updated)
542         {
543             if (LOG.isDebugEnabled())
544                 LOG.debug("State update failed: {} -> {}: {}", from, to, responseState.get());
545         }
546         return updated;
547     }
548 
549     @Override
550     public String toString()
551     {
552         return String.format("%s@%x(rsp=%s,failure=%s)",
553                 getClass().getSimpleName(),
554                 hashCode(),
555                 responseState,
556                 failure);
557     }
558 
559     /**
560      * The request states {@link HttpReceiver} goes through when receiving a response.
561      */
562     private enum ResponseState
563     {
564         /**
565          * One of the response*() methods is being executed.
566          */
567         TRANSIENT,
568         /**
569          * The response is not yet received, the initial state
570          */
571         IDLE,
572         /**
573          * The response status code has been received
574          */
575         BEGIN,
576         /**
577          * The response headers are being received
578          */
579         HEADER,
580         /**
581          * All the response headers have been received
582          */
583         HEADERS,
584         /**
585          * The response content is being received
586          */
587         CONTENT,
588         /**
589          * The response is failed
590          */
591         FAILURE
592     }
593 }