View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.io.IOException;
22  import java.net.URI;
23  import java.nio.ByteBuffer;
24  import java.util.Collections;
25  import java.util.Enumeration;
26  import java.util.HashMap;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.concurrent.atomic.AtomicReference;
30  
31  import org.eclipse.jetty.client.api.Response;
32  import org.eclipse.jetty.client.api.Result;
33  import org.eclipse.jetty.http.HttpField;
34  import org.eclipse.jetty.http.HttpHeader;
35  import org.eclipse.jetty.util.BufferUtil;
36  import org.eclipse.jetty.util.log.Log;
37  import org.eclipse.jetty.util.log.Logger;
38  
39  /**
40   * {@link HttpReceiver} provides the abstract code to implement the various steps of the receive of HTTP responses.
41   * <p />
42   * {@link HttpReceiver} maintains a state machine that is updated when the steps of receiving a response are executed.
43   * <p />
44   * Subclasses must handle the transport-specific details, for example how to read from the raw socket and how to parse
45   * the bytes read from the socket. Then they have to call the methods defined in this class in the following order:
46   * <ol>
47   * <li>{@link #responseBegin(HttpExchange)}, when the HTTP response data containing the HTTP status code
48   * is available</li>
49   * <li>{@link #responseHeader(HttpExchange, HttpField)}, when a HTTP field is available</li>
50   * <li>{@link #responseHeaders(HttpExchange)}, when all HTTP headers are available</li>
51   * <li>{@link #responseContent(HttpExchange, ByteBuffer)}, when HTTP content is available; this is the only method
52   * that may be invoked multiple times with different buffers containing different content</li>
53   * <li>{@link #responseSuccess(HttpExchange)}, when the response is complete</li>
54   * </ol>
55   * At any time, subclasses may invoke {@link #responseFailure(Throwable)} to indicate that the response has failed
56   * (for example, because of I/O exceptions).
57   * At any time, user threads may abort the response which will cause {@link #responseFailure(Throwable)} to be
58   * invoked.
59   * <p />
60   * The state machine maintained by this class ensures that the response steps are not executed by an I/O thread
61   * if the response has already been failed.
62   *
63   * @see HttpSender
64   */
65  public abstract class HttpReceiver
66  {
67      protected static final Logger LOG = Log.getLogger(HttpReceiver.class);
68  
69      private final AtomicReference<ResponseState> responseState = new AtomicReference<>(ResponseState.IDLE);
70      private final HttpChannel channel;
71      private volatile ContentDecoder decoder;
72  
73      protected HttpReceiver(HttpChannel channel)
74      {
75          this.channel = channel;
76      }
77  
78      protected HttpChannel getHttpChannel()
79      {
80          return channel;
81      }
82  
83      protected HttpExchange getHttpExchange()
84      {
85          return channel.getHttpExchange();
86      }
87  
88      protected HttpDestination getHttpDestination()
89      {
90          return channel.getHttpDestination();
91      }
92  
93      /**
94       * Method to be invoked when the response status code is available.
95       * <p />
96       * Subclasses must have set the response status code on the {@link Response} object of the {@link HttpExchange}
97       * prior invoking this method.
98       * <p />
99       * This method takes case of notifying {@link org.eclipse.jetty.client.api.Response.BeginListener}s.
100      *
101      * @param exchange the HTTP exchange
102      * @return whether the processing should continue
103      */
104     protected boolean responseBegin(HttpExchange exchange)
105     {
106         if (!updateResponseState(ResponseState.IDLE, ResponseState.BEGIN))
107             return false;
108 
109         HttpConversation conversation = exchange.getConversation();
110         HttpResponse response = exchange.getResponse();
111         // Probe the protocol handlers
112         HttpDestination destination = getHttpDestination();
113         HttpClient client = destination.getHttpClient();
114         ProtocolHandler protocolHandler = client.findProtocolHandler(exchange.getRequest(), response);
115         Response.Listener handlerListener = null;
116         if (protocolHandler != null)
117         {
118             handlerListener = protocolHandler.getResponseListener();
119             LOG.debug("Found protocol handler {}", protocolHandler);
120         }
121         exchange.getConversation().updateResponseListeners(handlerListener);
122 
123         LOG.debug("Response begin {}", response);
124         ResponseNotifier notifier = destination.getResponseNotifier();
125         notifier.notifyBegin(conversation.getResponseListeners(), response);
126 
127         return true;
128     }
129 
130     /**
131      * Method to be invoked when a response HTTP header is available.
132      * <p />
133      * Subclasses must not have added the header to the {@link Response} object of the {@link HttpExchange}
134      * prior invoking this method.
135      * <p />
136      * This method takes case of notifying {@link org.eclipse.jetty.client.api.Response.HeaderListener}s and storing cookies.
137      *
138      * @param exchange the HTTP exchange
139      * @param field the response HTTP field
140      * @return whether the processing should continue
141      */
142     protected boolean responseHeader(HttpExchange exchange, HttpField field)
143     {
144         out: while (true)
145         {
146             ResponseState current = responseState.get();
147             switch (current)
148             {
149                 case BEGIN:
150                 case HEADER:
151                 {
152                     if (updateResponseState(current, ResponseState.HEADER))
153                         break out;
154                     break;
155                 }
156                 default:
157                 {
158                     return false;
159                 }
160             }
161         }
162 
163         HttpResponse response = exchange.getResponse();
164         ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
165         boolean process = notifier.notifyHeader(exchange.getConversation().getResponseListeners(), response, field);
166         if (process)
167         {
168             response.getHeaders().add(field);
169             HttpHeader fieldHeader = field.getHeader();
170             if (fieldHeader != null)
171             {
172                 switch (fieldHeader)
173                 {
174                     case SET_COOKIE:
175                     case SET_COOKIE2:
176                     {
177                         storeCookie(exchange.getRequest().getURI(), field);
178                         break;
179                     }
180                     default:
181                     {
182                         break;
183                     }
184                 }
185             }
186         }
187 
188         return true;
189     }
190 
191     protected void storeCookie(URI uri, HttpField field)
192     {
193         try
194         {
195             String value = field.getValue();
196             if (value != null)
197             {
198                 Map<String, List<String>> header = new HashMap<>(1);
199                 header.put(field.getHeader().asString(), Collections.singletonList(value));
200                 getHttpDestination().getHttpClient().getCookieManager().put(uri, header);
201             }
202         }
203         catch (IOException x)
204         {
205             LOG.debug(x);
206         }
207     }
208 
209     /**
210      * Method to be invoked after all response HTTP headers are available.
211      * <p />
212      * This method takes case of notifying {@link org.eclipse.jetty.client.api.Response.HeadersListener}s.
213      *
214      * @param exchange the HTTP exchange
215      * @return whether the processing should continue
216      */
217     protected boolean responseHeaders(HttpExchange exchange)
218     {
219         out: while (true)
220         {
221             ResponseState current = responseState.get();
222             switch (current)
223             {
224                 case BEGIN:
225                 case HEADER:
226                 {
227                     if (updateResponseState(current, ResponseState.HEADERS))
228                         break out;
229                     break;
230                 }
231                 default:
232                 {
233                     return false;
234                 }
235             }
236         }
237 
238         HttpResponse response = exchange.getResponse();
239         if (LOG.isDebugEnabled())
240             LOG.debug("Response headers {}{}{}", response, System.getProperty("line.separator"), response.getHeaders().toString().trim());
241         ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
242         notifier.notifyHeaders(exchange.getConversation().getResponseListeners(), response);
243 
244         Enumeration<String> contentEncodings = response.getHeaders().getValues(HttpHeader.CONTENT_ENCODING.asString(), ",");
245         if (contentEncodings != null)
246         {
247             for (ContentDecoder.Factory factory : getHttpDestination().getHttpClient().getContentDecoderFactories())
248             {
249                 while (contentEncodings.hasMoreElements())
250                 {
251                     if (factory.getEncoding().equalsIgnoreCase(contentEncodings.nextElement()))
252                     {
253                         this.decoder = factory.newContentDecoder();
254                         break;
255                     }
256                 }
257             }
258         }
259 
260         return true;
261     }
262 
263     /**
264      * Method to be invoked when response HTTP content is available.
265      * <p />
266      * This method takes case of decoding the content, if necessary, and notifying {@link org.eclipse.jetty.client.api.Response.ContentListener}s.
267      *
268      * @param exchange the HTTP exchange
269      * @param buffer the response HTTP content buffer
270      * @return whether the processing should continue
271      */
272     protected boolean responseContent(HttpExchange exchange, ByteBuffer buffer)
273     {
274         out: while (true)
275         {
276             ResponseState current = responseState.get();
277             switch (current)
278             {
279                 case HEADERS:
280                 case CONTENT:
281                 {
282                     if (updateResponseState(current, ResponseState.CONTENT))
283                         break out;
284                     break;
285                 }
286                 default:
287                 {
288                     return false;
289                 }
290             }
291         }
292 
293         HttpResponse response = exchange.getResponse();
294         if (LOG.isDebugEnabled())
295             LOG.debug("Response content {}{}{}", response, System.getProperty("line.separator"), BufferUtil.toDetailString(buffer));
296 
297         ContentDecoder decoder = this.decoder;
298         if (decoder != null)
299         {
300             buffer = decoder.decode(buffer);
301             if (LOG.isDebugEnabled())
302                 LOG.debug("Response content decoded ({}) {}{}{}", decoder, response, System.getProperty("line.separator"), BufferUtil.toDetailString(buffer));
303         }
304 
305         ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
306         notifier.notifyContent(exchange.getConversation().getResponseListeners(), response, buffer);
307 
308         return true;
309     }
310 
311     /**
312      * Method to be invoked when the response is successful.
313      * <p />
314      * This method takes case of notifying {@link org.eclipse.jetty.client.api.Response.SuccessListener}s and possibly
315      * {@link org.eclipse.jetty.client.api.Response.CompleteListener}s (if the exchange is completed).
316      *
317      * @param exchange the HTTP exchange
318      * @return whether the response was processed as successful
319      */
320     protected boolean responseSuccess(HttpExchange exchange)
321     {
322         // Mark atomically the response as completed, with respect
323         // to concurrency between response success and response failure.
324         boolean completed = exchange.responseComplete();
325         if (!completed)
326             return false;
327 
328         // Reset to be ready for another response
329         reset();
330 
331         // Mark atomically the response as terminated and succeeded,
332         // with respect to concurrency between request and response.
333         // If there is a non-null result, then both sender and
334         // receiver are reset and ready to be reused, and the
335         // connection closed/pooled (depending on the transport).
336         Result result = exchange.terminateResponse(null);
337 
338         HttpResponse response = exchange.getResponse();
339         LOG.debug("Response success {}", response);
340         List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
341         ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
342         notifier.notifySuccess(listeners, response);
343 
344         if (result != null)
345         {
346             boolean ordered = getHttpDestination().getHttpClient().isStrictEventOrdering();
347             if (!ordered)
348                 channel.exchangeTerminated(result);
349             LOG.debug("Request/Response complete {}", response);
350             notifier.notifyComplete(listeners, result);
351             if (ordered)
352                 channel.exchangeTerminated(result);
353         }
354 
355         return true;
356     }
357 
358     /**
359      * Method to be invoked when the response is failed.
360      * <p />
361      * This method takes care of notifying {@link org.eclipse.jetty.client.api.Response.FailureListener}s.
362      *
363      * @param failure the response failure
364      * @return whether the response was processed as failed
365      */
366     protected boolean responseFailure(Throwable failure)
367     {
368         HttpExchange exchange = getHttpExchange();
369         // In case of a response error, the failure has already been notified
370         // and it is possible that a further attempt to read in the receive
371         // loop throws an exception that reenters here but without exchange;
372         // or, the server could just have timed out the connection.
373         if (exchange == null)
374             return false;
375 
376         // Mark atomically the response as completed, with respect
377         // to concurrency between response success and response failure.
378         boolean completed = exchange.responseComplete();
379         if (!completed)
380             return false;
381 
382         // Dispose to avoid further responses
383         dispose();
384 
385         // Mark atomically the response as terminated and failed,
386         // with respect to concurrency between request and response.
387         Result result = exchange.terminateResponse(failure);
388 
389         HttpResponse response = exchange.getResponse();
390         LOG.debug("Response failure {} {}", response, failure);
391         List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
392         ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
393         notifier.notifyFailure(listeners, response, failure);
394 
395         if (result != null)
396         {
397             boolean ordered = getHttpDestination().getHttpClient().isStrictEventOrdering();
398             if (!ordered)
399                 channel.exchangeTerminated(result);
400             notifier.notifyComplete(listeners, result);
401             if (ordered)
402                 channel.exchangeTerminated(result);
403         }
404 
405         return true;
406     }
407 
408     /**
409      * Resets this {@link HttpReceiver} state.
410      * <p />
411      * Subclasses should override (but remember to call {@code super}) to reset their own state.
412      * <p />
413      * Either this method or {@link #dispose()} is called.
414      */
415     protected void reset()
416     {
417         decoder = null;
418         responseState.set(ResponseState.IDLE);
419     }
420 
421     /**
422      * Disposes this {@link HttpReceiver} state.
423      * <p />
424      * Subclasses should override (but remember to call {@code super}) to dispose their own state.
425      * <p />
426      * Either this method or {@link #reset()} is called.
427      */
428     protected void dispose()
429     {
430         decoder = null;
431         responseState.set(ResponseState.FAILURE);
432     }
433 
434     public boolean abort(Throwable cause)
435     {
436         return responseFailure(cause);
437     }
438 
439     private boolean updateResponseState(ResponseState from, ResponseState to)
440     {
441         boolean updated = responseState.compareAndSet(from, to);
442         if (!updated)
443             LOG.debug("State update failed: {} -> {}: {}", from, to, responseState.get());
444         return updated;
445     }
446 
447     /**
448      * The request states {@link HttpReceiver} goes through when receiving a response.
449      */
450     private enum ResponseState
451     {
452         /**
453          * The response is not yet received, the initial state
454          */
455         IDLE,
456         /**
457          * The response status code has been received
458          */
459         BEGIN,
460         /**
461          * The response headers are being received
462          */
463         HEADER,
464         /**
465          * All the response headers have been received
466          */
467         HEADERS,
468         /**
469          * The response content is being received
470          */
471         CONTENT,
472         /**
473          * The response is failed
474          */
475         FAILURE
476     }
477 }