View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.io.IOException;
22  import java.net.URI;
23  import java.nio.ByteBuffer;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.Enumeration;
27  import java.util.HashMap;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.concurrent.atomic.AtomicReference;
31  
32  import org.eclipse.jetty.client.api.Response;
33  import org.eclipse.jetty.client.api.Result;
34  import org.eclipse.jetty.http.HttpField;
35  import org.eclipse.jetty.http.HttpHeader;
36  import org.eclipse.jetty.util.BufferUtil;
37  import org.eclipse.jetty.util.Callback;
38  import org.eclipse.jetty.util.log.Log;
39  import org.eclipse.jetty.util.log.Logger;
40  
41  /**
42   * {@link HttpReceiver} provides the abstract code to implement the various steps of the receive of HTTP responses.
43   * <p />
44   * {@link HttpReceiver} maintains a state machine that is updated when the steps of receiving a response are executed.
45   * <p />
46   * Subclasses must handle the transport-specific details, for example how to read from the raw socket and how to parse
47   * the bytes read from the socket. Then they have to call the methods defined in this class in the following order:
48   * <ol>
49   * <li>{@link #responseBegin(HttpExchange)}, when the HTTP response data containing the HTTP status code
50   * is available</li>
51   * <li>{@link #responseHeader(HttpExchange, HttpField)}, when a HTTP field is available</li>
52   * <li>{@link #responseHeaders(HttpExchange)}, when all HTTP headers are available</li>
53   * <li>{@link #responseContent(HttpExchange, ByteBuffer, Callback)}, when HTTP content is available</li>
54   * <li>{@link #responseSuccess(HttpExchange)}, when the response is successful</li>
55   * </ol>
56   * At any time, subclasses may invoke {@link #responseFailure(Throwable)} to indicate that the response has failed
57   * (for example, because of I/O exceptions).
58   * At any time, user threads may abort the response which will cause {@link #responseFailure(Throwable)} to be
59   * invoked.
60   * <p />
61   * The state machine maintained by this class ensures that the response steps are not executed by an I/O thread
62   * if the response has already been failed.
63   *
64   * @see HttpSender
65   */
66  public abstract class HttpReceiver
67  {
68      protected static final Logger LOG = Log.getLogger(HttpReceiver.class);
69  
70      private final AtomicReference<ResponseState> responseState = new AtomicReference<>(ResponseState.IDLE);
71      private final HttpChannel channel;
72      private ContentDecoder decoder;
73      private Throwable failure;
74  
75      protected HttpReceiver(HttpChannel channel)
76      {
77          this.channel = channel;
78      }
79  
80      protected HttpChannel getHttpChannel()
81      {
82          return channel;
83      }
84  
85      protected HttpExchange getHttpExchange()
86      {
87          return channel.getHttpExchange();
88      }
89  
90      protected HttpDestination getHttpDestination()
91      {
92          return channel.getHttpDestination();
93      }
94  
95      /**
96       * Method to be invoked when the response status code is available.
97       * <p />
98       * Subclasses must have set the response status code on the {@link Response} object of the {@link HttpExchange}
99       * prior invoking this method.
100      * <p />
101      * This method takes case of notifying {@link org.eclipse.jetty.client.api.Response.BeginListener}s.
102      *
103      * @param exchange the HTTP exchange
104      * @return whether the processing should continue
105      */
106     protected boolean responseBegin(HttpExchange exchange)
107     {
108         if (!updateResponseState(ResponseState.IDLE, ResponseState.TRANSIENT))
109             return false;
110 
111         HttpConversation conversation = exchange.getConversation();
112         HttpResponse response = exchange.getResponse();
113         // Probe the protocol handlers
114         HttpDestination destination = getHttpDestination();
115         HttpClient client = destination.getHttpClient();
116         ProtocolHandler protocolHandler = client.findProtocolHandler(exchange.getRequest(), response);
117         Response.Listener handlerListener = null;
118         if (protocolHandler != null)
119         {
120             handlerListener = protocolHandler.getResponseListener();
121             if (LOG.isDebugEnabled())
122                 LOG.debug("Found protocol handler {}", protocolHandler);
123         }
124         exchange.getConversation().updateResponseListeners(handlerListener);
125 
126         if (LOG.isDebugEnabled())
127             LOG.debug("Response begin {}", response);
128         ResponseNotifier notifier = destination.getResponseNotifier();
129         notifier.notifyBegin(conversation.getResponseListeners(), response);
130 
131         if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.BEGIN))
132             terminateResponse(exchange, failure);
133 
134         return true;
135     }
136 
137     /**
138      * Method to be invoked when a response HTTP header is available.
139      * <p />
140      * Subclasses must not have added the header to the {@link Response} object of the {@link HttpExchange}
141      * prior invoking this method.
142      * <p />
143      * This method takes case of notifying {@link org.eclipse.jetty.client.api.Response.HeaderListener}s and storing cookies.
144      *
145      * @param exchange the HTTP exchange
146      * @param field the response HTTP field
147      * @return whether the processing should continue
148      */
149     protected boolean responseHeader(HttpExchange exchange, HttpField field)
150     {
151         out: while (true)
152         {
153             ResponseState current = responseState.get();
154             switch (current)
155             {
156                 case BEGIN:
157                 case HEADER:
158                 {
159                     if (updateResponseState(current, ResponseState.TRANSIENT))
160                         break out;
161                     break;
162                 }
163                 default:
164                 {
165                     return false;
166                 }
167             }
168         }
169 
170         HttpResponse response = exchange.getResponse();
171         ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
172         boolean process = notifier.notifyHeader(exchange.getConversation().getResponseListeners(), response, field);
173         if (process)
174         {
175             response.getHeaders().add(field);
176             HttpHeader fieldHeader = field.getHeader();
177             if (fieldHeader != null)
178             {
179                 switch (fieldHeader)
180                 {
181                     case SET_COOKIE:
182                     case SET_COOKIE2:
183                     {
184                         storeCookie(exchange.getRequest().getURI(), field);
185                         break;
186                     }
187                     default:
188                     {
189                         break;
190                     }
191                 }
192             }
193         }
194 
195         if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADER))
196             terminateResponse(exchange, failure);
197 
198         return true;
199     }
200 
201     protected void storeCookie(URI uri, HttpField field)
202     {
203         try
204         {
205             String value = field.getValue();
206             if (value != null)
207             {
208                 Map<String, List<String>> header = new HashMap<>(1);
209                 header.put(field.getHeader().asString(), Collections.singletonList(value));
210                 getHttpDestination().getHttpClient().getCookieManager().put(uri, header);
211             }
212         }
213         catch (IOException x)
214         {
215             if (LOG.isDebugEnabled())
216                 LOG.debug(x);
217         }
218     }
219 
220     /**
221      * Method to be invoked after all response HTTP headers are available.
222      * <p />
223      * This method takes case of notifying {@link org.eclipse.jetty.client.api.Response.HeadersListener}s.
224      *
225      * @param exchange the HTTP exchange
226      * @return whether the processing should continue
227      */
228     protected boolean responseHeaders(HttpExchange exchange)
229     {
230         out: while (true)
231         {
232             ResponseState current = responseState.get();
233             switch (current)
234             {
235                 case BEGIN:
236                 case HEADER:
237                 {
238                     if (updateResponseState(current, ResponseState.TRANSIENT))
239                         break out;
240                     break;
241                 }
242                 default:
243                 {
244                     return false;
245                 }
246             }
247         }
248 
249         HttpResponse response = exchange.getResponse();
250         if (LOG.isDebugEnabled())
251             LOG.debug("Response headers {}{}{}", response, System.lineSeparator(), response.getHeaders().toString().trim());
252         ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
253         notifier.notifyHeaders(exchange.getConversation().getResponseListeners(), response);
254 
255         Enumeration<String> contentEncodings = response.getHeaders().getValues(HttpHeader.CONTENT_ENCODING.asString(), ",");
256         if (contentEncodings != null)
257         {
258             for (ContentDecoder.Factory factory : getHttpDestination().getHttpClient().getContentDecoderFactories())
259             {
260                 while (contentEncodings.hasMoreElements())
261                 {
262                     if (factory.getEncoding().equalsIgnoreCase(contentEncodings.nextElement()))
263                     {
264                         this.decoder = factory.newContentDecoder();
265                         break;
266                     }
267                 }
268             }
269         }
270 
271         if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADERS))
272             terminateResponse(exchange, failure);
273 
274         return true;
275     }
276 
277     /**
278      * Method to be invoked when response HTTP content is available.
279      * <p />
280      * This method takes case of decoding the content, if necessary, and notifying {@link org.eclipse.jetty.client.api.Response.ContentListener}s.
281      *
282      * @param exchange the HTTP exchange
283      * @param buffer the response HTTP content buffer
284      * @return whether the processing should continue
285      */
286     protected boolean responseContent(HttpExchange exchange, ByteBuffer buffer, final Callback callback)
287     {
288         out: while (true)
289         {
290             ResponseState current = responseState.get();
291             switch (current)
292             {
293                 case HEADERS:
294                 case CONTENT:
295                 {
296                     if (updateResponseState(current, ResponseState.TRANSIENT))
297                         break out;
298                     break;
299                 }
300                 default:
301                 {
302                     return false;
303                 }
304             }
305         }
306 
307         HttpResponse response = exchange.getResponse();
308         if (LOG.isDebugEnabled())
309             LOG.debug("Response content {}{}{}", response, System.lineSeparator(), BufferUtil.toDetailString(buffer));
310 
311         ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
312         List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
313 
314         ContentDecoder decoder = this.decoder;
315         if (decoder == null)
316         {
317             notifier.notifyContent(listeners, response, buffer, callback);
318         }
319         else
320         {
321             List<ByteBuffer> decodeds = new ArrayList<>(2);
322             while (buffer.hasRemaining())
323             {
324                 ByteBuffer decoded = decoder.decode(buffer);
325                 if (!decoded.hasRemaining())
326                     continue;
327                 decodeds.add(decoded);
328                 if (LOG.isDebugEnabled())
329                     LOG.debug("Response content decoded ({}) {}{}{}", decoder, response, System.lineSeparator(), BufferUtil.toDetailString(decoded));
330             }
331 
332             if (decodeds.isEmpty())
333             {
334                 callback.succeeded();
335             }
336             else
337             {
338                 Callback partial = new Callback.Adapter()
339                 {
340                     @Override
341                     public void failed(Throwable x)
342                     {
343                         callback.failed(x);
344                     }
345                 };
346 
347                 for (int i = 1, size = decodeds.size(); i <= size; ++i)
348                     notifier.notifyContent(listeners, response, decodeds.get(i - 1), i < size ? partial : callback);
349             }
350         }
351 
352         if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
353             terminateResponse(exchange, failure);
354 
355         return true;
356     }
357 
358     /**
359      * Method to be invoked when the response is successful.
360      * <p />
361      * This method takes case of notifying {@link org.eclipse.jetty.client.api.Response.SuccessListener}s and possibly
362      * {@link org.eclipse.jetty.client.api.Response.CompleteListener}s (if the exchange is completed).
363      *
364      * @param exchange the HTTP exchange
365      * @return whether the response was processed as successful
366      */
367     protected boolean responseSuccess(HttpExchange exchange)
368     {
369         // Mark atomically the response as completed, with respect
370         // to concurrency between response success and response failure.
371         boolean completed = exchange.responseComplete();
372         if (!completed)
373             return false;
374 
375         responseState.set(ResponseState.IDLE);
376 
377         // Reset to be ready for another response.
378         reset();
379 
380         // Mark atomically the response as terminated and succeeded,
381         // with respect to concurrency between request and response.
382         Result result = exchange.terminateResponse(null);
383 
384         // It is important to notify *after* we reset and terminate
385         // because the notification may trigger another request/response.
386         HttpResponse response = exchange.getResponse();
387         if (LOG.isDebugEnabled())
388             LOG.debug("Response success {}", response);
389         List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
390         ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
391         notifier.notifySuccess(listeners, response);
392 
393         terminateResponse(exchange, result);
394 
395         return true;
396     }
397 
398     /**
399      * Method to be invoked when the response is failed.
400      * <p />
401      * This method takes care of notifying {@link org.eclipse.jetty.client.api.Response.FailureListener}s.
402      *
403      * @param failure the response failure
404      * @return whether the response was processed as failed
405      */
406     protected boolean responseFailure(Throwable failure)
407     {
408         HttpExchange exchange = getHttpExchange();
409         // In case of a response error, the failure has already been notified
410         // and it is possible that a further attempt to read in the receive
411         // loop throws an exception that reenters here but without exchange;
412         // or, the server could just have timed out the connection.
413         if (exchange == null)
414             return false;
415 
416         // Mark atomically the response as completed, with respect
417         // to concurrency between response success and response failure.
418         boolean completed = exchange.responseComplete();
419         if (!completed)
420             return false;
421 
422         this.failure = failure;
423 
424         // Update the state to avoid more response processing.
425         boolean fail;
426         while (true)
427         {
428             ResponseState current = responseState.get();
429             if (updateResponseState(current, ResponseState.FAILURE))
430             {
431                 fail = current != ResponseState.TRANSIENT;
432                 break;
433             }
434         }
435 
436         dispose();
437 
438         // Mark atomically the response as terminated and failed,
439         // with respect to concurrency between request and response.
440         Result result = exchange.terminateResponse(failure);
441 
442         HttpResponse response = exchange.getResponse();
443         if (LOG.isDebugEnabled())
444             LOG.debug("Response failure {} {}", response, failure);
445         List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
446         ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
447         notifier.notifyFailure(listeners, response, failure);
448 
449         if (fail)
450         {
451             terminateResponse(exchange, result);
452         }
453         else
454         {
455             if (LOG.isDebugEnabled())
456                 LOG.debug("Concurrent failure: response termination skipped, performed by helpers");
457         }
458 
459         return true;
460     }
461 
462     private void terminateResponse(HttpExchange exchange, Throwable failure)
463     {
464         Result result = exchange.terminateResponse(failure);
465         terminateResponse(exchange, result);
466     }
467 
468     private void terminateResponse(HttpExchange exchange, Result result)
469     {
470         HttpResponse response = exchange.getResponse();
471 
472         if (LOG.isDebugEnabled())
473             LOG.debug("Response complete {}", response);
474 
475         if (result != null)
476         {
477             boolean ordered = getHttpDestination().getHttpClient().isStrictEventOrdering();
478             if (!ordered)
479                 channel.exchangeTerminated(result);
480             if (LOG.isDebugEnabled())
481                 LOG.debug("Request/Response {} {}", failure == null ? "succeeded" : "failed", response);
482             List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
483             ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
484             notifier.notifyComplete(listeners, result);
485             if (ordered)
486                 channel.exchangeTerminated(result);
487         }
488     }
489 
490     /**
491      * Resets this {@link HttpReceiver} state.
492      * <p />
493      * Subclasses should override (but remember to call {@code super}) to reset their own state.
494      * <p />
495      * Either this method or {@link #dispose()} is called.
496      */
497     protected void reset()
498     {
499         decoder = null;
500     }
501 
502     /**
503      * Disposes this {@link HttpReceiver} state.
504      * <p />
505      * Subclasses should override (but remember to call {@code super}) to dispose their own state.
506      * <p />
507      * Either this method or {@link #reset()} is called.
508      */
509     protected void dispose()
510     {
511         decoder = null;
512     }
513 
514     public boolean abort(Throwable cause)
515     {
516         return responseFailure(cause);
517     }
518 
519     private boolean updateResponseState(ResponseState from, ResponseState to)
520     {
521         boolean updated = responseState.compareAndSet(from, to);
522         if (!updated)
523         {
524             if (LOG.isDebugEnabled())
525                 LOG.debug("State update failed: {} -> {}: {}", from, to, responseState.get());
526         }
527         return updated;
528     }
529 
530     /**
531      * The request states {@link HttpReceiver} goes through when receiving a response.
532      */
533     private enum ResponseState
534     {
535         /**
536          * One of the response*() methods is being executed.
537          */
538         TRANSIENT,
539         /**
540          * The response is not yet received, the initial state
541          */
542         IDLE,
543         /**
544          * The response status code has been received
545          */
546         BEGIN,
547         /**
548          * The response headers are being received
549          */
550         HEADER,
551         /**
552          * All the response headers have been received
553          */
554         HEADERS,
555         /**
556          * The response content is being received
557          */
558         CONTENT,
559         /**
560          * The response is failed
561          */
562         FAILURE
563     }
564 }