View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.io.IOException;
22  import java.net.URI;
23  import java.nio.ByteBuffer;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.Enumeration;
27  import java.util.HashMap;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.concurrent.atomic.AtomicReference;
31  
32  import org.eclipse.jetty.client.api.Response;
33  import org.eclipse.jetty.client.api.Result;
34  import org.eclipse.jetty.http.HttpField;
35  import org.eclipse.jetty.http.HttpHeader;
36  import org.eclipse.jetty.http.HttpStatus;
37  import org.eclipse.jetty.util.BufferUtil;
38  import org.eclipse.jetty.util.Callback;
39  import org.eclipse.jetty.util.CountingCallback;
40  import org.eclipse.jetty.util.log.Log;
41  import org.eclipse.jetty.util.log.Logger;
42  
43  /**
44   * {@link HttpReceiver} provides the abstract code to implement the various steps of the receive of HTTP responses.
45   * <p>
46   * {@link HttpReceiver} maintains a state machine that is updated when the steps of receiving a response are executed.
47   * <p>
48   * Subclasses must handle the transport-specific details, for example how to read from the raw socket and how to parse
49   * the bytes read from the socket. Then they have to call the methods defined in this class in the following order:
50   * <ol>
51   * <li>{@link #responseBegin(HttpExchange)}, when the HTTP response data containing the HTTP status code
52   * is available</li>
53   * <li>{@link #responseHeader(HttpExchange, HttpField)}, when a HTTP field is available</li>
54   * <li>{@link #responseHeaders(HttpExchange)}, when all HTTP headers are available</li>
55   * <li>{@link #responseContent(HttpExchange, ByteBuffer, Callback)}, when HTTP content is available</li>
56   * <li>{@link #responseSuccess(HttpExchange)}, when the response is successful</li>
57   * </ol>
58   * At any time, subclasses may invoke {@link #responseFailure(Throwable)} to indicate that the response has failed
59   * (for example, because of I/O exceptions).
60   * At any time, user threads may abort the response which will cause {@link #responseFailure(Throwable)} to be
61   * invoked.
62   * <p>
63   * The state machine maintained by this class ensures that the response steps are not executed by an I/O thread
64   * if the response has already been failed.
65   *
66   * @see HttpSender
67   */
68  public abstract class HttpReceiver
69  {
70      protected static final Logger LOG = Log.getLogger(HttpReceiver.class);
71  
72      private final AtomicReference<ResponseState> responseState = new AtomicReference<>(ResponseState.IDLE);
73      private final HttpChannel channel;
74      private ContentDecoder decoder;
75      private Throwable failure;
76  
77      protected HttpReceiver(HttpChannel channel)
78      {
79          this.channel = channel;
80      }
81  
82      protected HttpChannel getHttpChannel()
83      {
84          return channel;
85      }
86  
87      protected HttpExchange getHttpExchange()
88      {
89          return channel.getHttpExchange();
90      }
91  
92      protected HttpDestination getHttpDestination()
93      {
94          return channel.getHttpDestination();
95      }
96  
97      /**
98       * Method to be invoked when the response status code is available.
99       * <p>
100      * Subclasses must have set the response status code on the {@link Response} object of the {@link HttpExchange}
101      * prior invoking this method.
102      * <p>
103      * This method takes case of notifying {@link org.eclipse.jetty.client.api.Response.BeginListener}s.
104      *
105      * @param exchange the HTTP exchange
106      * @return whether the processing should continue
107      */
108     protected boolean responseBegin(HttpExchange exchange)
109     {
110         if (!updateResponseState(ResponseState.IDLE, ResponseState.TRANSIENT))
111             return false;
112 
113         HttpConversation conversation = exchange.getConversation();
114         HttpResponse response = exchange.getResponse();
115         // Probe the protocol handlers
116         HttpDestination destination = getHttpDestination();
117         HttpClient client = destination.getHttpClient();
118         ProtocolHandler protocolHandler = client.findProtocolHandler(exchange.getRequest(), response);
119         Response.Listener handlerListener = null;
120         if (protocolHandler != null)
121         {
122             handlerListener = protocolHandler.getResponseListener();
123             if (LOG.isDebugEnabled())
124                 LOG.debug("Found protocol handler {}", protocolHandler);
125         }
126         exchange.getConversation().updateResponseListeners(handlerListener);
127 
128         if (LOG.isDebugEnabled())
129             LOG.debug("Response begin {}", response);
130         ResponseNotifier notifier = destination.getResponseNotifier();
131         notifier.notifyBegin(conversation.getResponseListeners(), response);
132 
133         if (updateResponseState(ResponseState.TRANSIENT, ResponseState.BEGIN))
134             return true;
135 
136         terminateResponse(exchange);
137         return false;
138     }
139 
140     /**
141      * Method to be invoked when a response HTTP header is available.
142      * <p>
143      * Subclasses must not have added the header to the {@link Response} object of the {@link HttpExchange}
144      * prior invoking this method.
145      * <p>
146      * This method takes case of notifying {@link org.eclipse.jetty.client.api.Response.HeaderListener}s and storing cookies.
147      *
148      * @param exchange the HTTP exchange
149      * @param field the response HTTP field
150      * @return whether the processing should continue
151      */
152     protected boolean responseHeader(HttpExchange exchange, HttpField field)
153     {
154         out: while (true)
155         {
156             ResponseState current = responseState.get();
157             switch (current)
158             {
159                 case BEGIN:
160                 case HEADER:
161                 {
162                     if (updateResponseState(current, ResponseState.TRANSIENT))
163                         break out;
164                     break;
165                 }
166                 default:
167                 {
168                     return false;
169                 }
170             }
171         }
172 
173         HttpResponse response = exchange.getResponse();
174         ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
175         boolean process = notifier.notifyHeader(exchange.getConversation().getResponseListeners(), response, field);
176         if (process)
177         {
178             response.getHeaders().add(field);
179             HttpHeader fieldHeader = field.getHeader();
180             if (fieldHeader != null)
181             {
182                 switch (fieldHeader)
183                 {
184                     case SET_COOKIE:
185                     case SET_COOKIE2:
186                     {
187                         storeCookie(exchange.getRequest().getURI(), field);
188                         break;
189                     }
190                     default:
191                     {
192                         break;
193                     }
194                 }
195             }
196         }
197 
198         if (updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADER))
199             return true;
200 
201         terminateResponse(exchange);
202         return false;
203     }
204 
205     protected void storeCookie(URI uri, HttpField field)
206     {
207         try
208         {
209             String value = field.getValue();
210             if (value != null)
211             {
212                 Map<String, List<String>> header = new HashMap<>(1);
213                 header.put(field.getHeader().asString(), Collections.singletonList(value));
214                 getHttpDestination().getHttpClient().getCookieManager().put(uri, header);
215             }
216         }
217         catch (IOException x)
218         {
219             if (LOG.isDebugEnabled())
220                 LOG.debug(x);
221         }
222     }
223 
224     /**
225      * Method to be invoked after all response HTTP headers are available.
226      * <p>
227      * This method takes case of notifying {@link org.eclipse.jetty.client.api.Response.HeadersListener}s.
228      *
229      * @param exchange the HTTP exchange
230      * @return whether the processing should continue
231      */
232     protected boolean responseHeaders(HttpExchange exchange)
233     {
234         out: while (true)
235         {
236             ResponseState current = responseState.get();
237             switch (current)
238             {
239                 case BEGIN:
240                 case HEADER:
241                 {
242                     if (updateResponseState(current, ResponseState.TRANSIENT))
243                         break out;
244                     break;
245                 }
246                 default:
247                 {
248                     return false;
249                 }
250             }
251         }
252 
253         HttpResponse response = exchange.getResponse();
254         if (LOG.isDebugEnabled())
255             LOG.debug("Response headers {}{}{}", response, System.lineSeparator(), response.getHeaders().toString().trim());
256         ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
257         notifier.notifyHeaders(exchange.getConversation().getResponseListeners(), response);
258 
259         Enumeration<String> contentEncodings = response.getHeaders().getValues(HttpHeader.CONTENT_ENCODING.asString(), ",");
260         if (contentEncodings != null)
261         {
262             for (ContentDecoder.Factory factory : getHttpDestination().getHttpClient().getContentDecoderFactories())
263             {
264                 while (contentEncodings.hasMoreElements())
265                 {
266                     if (factory.getEncoding().equalsIgnoreCase(contentEncodings.nextElement()))
267                     {
268                         this.decoder = factory.newContentDecoder();
269                         break;
270                     }
271                 }
272             }
273         }
274 
275         if (updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADERS))
276             return true;
277 
278         terminateResponse(exchange);
279         return false;
280     }
281 
282     /**
283      * Method to be invoked when response HTTP content is available.
284      * <p>
285      * This method takes case of decoding the content, if necessary, and notifying {@link org.eclipse.jetty.client.api.Response.ContentListener}s.
286      *
287      * @param exchange the HTTP exchange
288      * @param buffer the response HTTP content buffer
289      * @param callback the callback
290      * @return whether the processing should continue
291      */
292     protected boolean responseContent(HttpExchange exchange, ByteBuffer buffer, final Callback callback)
293     {
294         out: while (true)
295         {
296             ResponseState current = responseState.get();
297             switch (current)
298             {
299                 case HEADERS:
300                 case CONTENT:
301                 {
302                     if (updateResponseState(current, ResponseState.TRANSIENT))
303                         break out;
304                     break;
305                 }
306                 default:
307                 {
308                     return false;
309                 }
310             }
311         }
312 
313         HttpResponse response = exchange.getResponse();
314         if (LOG.isDebugEnabled())
315             LOG.debug("Response content {}{}{}", response, System.lineSeparator(), BufferUtil.toDetailString(buffer));
316 
317         ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
318         List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
319 
320         ContentDecoder decoder = this.decoder;
321         if (decoder == null)
322         {
323             notifier.notifyContent(listeners, response, buffer, callback);
324         }
325         else
326         {
327             List<ByteBuffer> decodeds = new ArrayList<>(2);
328             while (buffer.hasRemaining())
329             {
330                 ByteBuffer decoded = decoder.decode(buffer);
331                 if (!decoded.hasRemaining())
332                     continue;
333                 decodeds.add(decoded);
334                 if (LOG.isDebugEnabled())
335                     LOG.debug("Response content decoded ({}) {}{}{}", decoder, response, System.lineSeparator(), BufferUtil.toDetailString(decoded));
336             }
337 
338             if (decodeds.isEmpty())
339             {
340                 callback.succeeded();
341             }
342             else
343             {
344                 int size = decodeds.size();
345                 CountingCallback counter = new CountingCallback(callback, size);
346                 for (int i = 0; i < size; ++i)
347                     notifier.notifyContent(listeners, response, decodeds.get(i), counter);
348             }
349         }
350 
351         if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
352             return true;
353 
354         terminateResponse(exchange);
355         return false;
356     }
357 
358     /**
359      * Method to be invoked when the response is successful.
360      * <p>
361      * This method takes case of notifying {@link org.eclipse.jetty.client.api.Response.SuccessListener}s and possibly
362      * {@link org.eclipse.jetty.client.api.Response.CompleteListener}s (if the exchange is completed).
363      *
364      * @param exchange the HTTP exchange
365      * @return whether the response was processed as successful
366      */
367     protected boolean responseSuccess(HttpExchange exchange)
368     {
369         // Mark atomically the response as completed, with respect
370         // to concurrency between response success and response failure.
371         if (!exchange.responseComplete(null))
372             return false;
373 
374         responseState.set(ResponseState.IDLE);
375 
376         // Reset to be ready for another response.
377         reset();
378 
379         HttpResponse response = exchange.getResponse();
380         if (LOG.isDebugEnabled())
381             LOG.debug("Response success {}", response);
382         List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
383         ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
384         notifier.notifySuccess(listeners, response);
385 
386         // Special case for 100 Continue that cannot
387         // be handled by the ContinueProtocolHandler.
388         if (exchange.getResponse().getStatus() == HttpStatus.CONTINUE_100)
389             return true;
390 
391         // Mark atomically the response as terminated, with
392         // respect to concurrency between request and response.
393         Result result = exchange.terminateResponse();
394         terminateResponse(exchange, result);
395 
396         return true;
397     }
398 
399     /**
400      * Method to be invoked when the response is failed.
401      * <p>
402      * This method takes care of notifying {@link org.eclipse.jetty.client.api.Response.FailureListener}s.
403      *
404      * @param failure the response failure
405      * @return whether the response was processed as failed
406      */
407     protected boolean responseFailure(Throwable failure)
408     {
409         HttpExchange exchange = getHttpExchange();
410         // In case of a response error, the failure has already been notified
411         // and it is possible that a further attempt to read in the receive
412         // loop throws an exception that reenters here but without exchange;
413         // or, the server could just have timed out the connection.
414         if (exchange == null)
415             return false;
416 
417         // Mark atomically the response as completed, with respect
418         // to concurrency between response success and response failure.
419         if (exchange.responseComplete(failure))
420             return abort(exchange, failure);
421 
422         return false;
423     }
424 
425     private void terminateResponse(HttpExchange exchange)
426     {
427         Result result = exchange.terminateResponse();
428         terminateResponse(exchange, result);
429     }
430 
431     private void terminateResponse(HttpExchange exchange, Result result)
432     {
433         HttpResponse response = exchange.getResponse();
434 
435         if (LOG.isDebugEnabled())
436             LOG.debug("Response complete {}", response);
437 
438         if (result != null)
439         {
440             boolean ordered = getHttpDestination().getHttpClient().isStrictEventOrdering();
441             if (!ordered)
442                 channel.exchangeTerminated(exchange, result);
443             if (LOG.isDebugEnabled())
444                 LOG.debug("Request/Response {}: {}", failure == null ? "succeeded" : "failed", result);
445             List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
446             ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
447             notifier.notifyComplete(listeners, result);
448             if (ordered)
449                 channel.exchangeTerminated(exchange, result);
450         }
451     }
452 
453     /**
454      * Resets this {@link HttpReceiver} state.
455      * <p>
456      * Subclasses should override (but remember to call {@code super}) to reset their own state.
457      * <p>
458      * Either this method or {@link #dispose()} is called.
459      */
460     protected void reset()
461     {
462         decoder = null;
463     }
464 
465     /**
466      * Disposes this {@link HttpReceiver} state.
467      * <p>
468      * Subclasses should override (but remember to call {@code super}) to dispose their own state.
469      * <p>
470      * Either this method or {@link #reset()} is called.
471      */
472     protected void dispose()
473     {
474         decoder = null;
475     }
476 
477     public boolean abort(HttpExchange exchange, Throwable failure)
478     {
479         // Update the state to avoid more response processing.
480         boolean terminate;
481         out: while (true)
482         {
483             ResponseState current = responseState.get();
484             switch (current)
485             {
486                 case FAILURE:
487                 {
488                     return false;
489                 }
490                 default:
491                 {
492                     if (updateResponseState(current, ResponseState.FAILURE))
493                     {
494                         terminate = current != ResponseState.TRANSIENT;
495                         break out;
496                     }
497                     break;
498                 }
499             }
500         }
501 
502         this.failure = failure;
503 
504         dispose();
505 
506         HttpResponse response = exchange.getResponse();
507         if (LOG.isDebugEnabled())
508             LOG.debug("Response failure {} {} on {}: {}", response, exchange, getHttpChannel(), failure);
509         List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
510         ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
511         notifier.notifyFailure(listeners, response, failure);
512 
513         if (terminate)
514         {
515             // Mark atomically the response as terminated, with
516             // respect to concurrency between request and response.
517             Result result = exchange.terminateResponse();
518             terminateResponse(exchange, result);
519         }
520         else
521         {
522             if (LOG.isDebugEnabled())
523                 LOG.debug("Concurrent failure: response termination skipped, performed by helpers");
524         }
525 
526         return true;
527     }
528 
529     private boolean updateResponseState(ResponseState from, ResponseState to)
530     {
531         boolean updated = responseState.compareAndSet(from, to);
532         if (!updated)
533         {
534             if (LOG.isDebugEnabled())
535                 LOG.debug("State update failed: {} -> {}: {}", from, to, responseState.get());
536         }
537         return updated;
538     }
539 
540     @Override
541     public String toString()
542     {
543         return String.format("%s@%x(rsp=%s,failure=%s)",
544                 getClass().getSimpleName(),
545                 hashCode(),
546                 responseState,
547                 failure);
548     }
549 
550     /**
551      * The request states {@link HttpReceiver} goes through when receiving a response.
552      */
553     private enum ResponseState
554     {
555         /**
556          * One of the response*() methods is being executed.
557          */
558         TRANSIENT,
559         /**
560          * The response is not yet received, the initial state
561          */
562         IDLE,
563         /**
564          * The response status code has been received
565          */
566         BEGIN,
567         /**
568          * The response headers are being received
569          */
570         HEADER,
571         /**
572          * All the response headers have been received
573          */
574         HEADERS,
575         /**
576          * The response content is being received
577          */
578         CONTENT,
579         /**
580          * The response is failed
581          */
582         FAILURE
583     }
584 }