View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.net.URI;
24  import java.nio.ByteBuffer;
25  import java.util.Collections;
26  import java.util.Enumeration;
27  import java.util.HashMap;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.concurrent.TimeoutException;
31  import java.util.concurrent.atomic.AtomicMarkableReference;
32  import java.util.concurrent.atomic.AtomicReference;
33  
34  import org.eclipse.jetty.client.api.Response;
35  import org.eclipse.jetty.client.api.Result;
36  import org.eclipse.jetty.http.HttpField;
37  import org.eclipse.jetty.http.HttpHeader;
38  import org.eclipse.jetty.http.HttpMethod;
39  import org.eclipse.jetty.http.HttpParser;
40  import org.eclipse.jetty.http.HttpVersion;
41  import org.eclipse.jetty.io.ByteBufferPool;
42  import org.eclipse.jetty.io.EndPoint;
43  import org.eclipse.jetty.io.EofException;
44  import org.eclipse.jetty.util.log.Log;
45  import org.eclipse.jetty.util.log.Logger;
46  
47  public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
48  {
49      private static final Logger LOG = Log.getLogger(HttpReceiver.class);
50  
51      private final AtomicReference<State> state = new AtomicReference<>(State.IDLE);
52      private final HttpParser parser = new HttpParser(this);
53      private final HttpConnection connection;
54      private ContentDecoder decoder;
55  
56      public HttpReceiver(HttpConnection connection)
57      {
58          this.connection = connection;
59      }
60  
61      public void receive()
62      {
63          EndPoint endPoint = connection.getEndPoint();
64          HttpClient client = connection.getHttpClient();
65          ByteBufferPool bufferPool = client.getByteBufferPool();
66          ByteBuffer buffer = bufferPool.acquire(client.getResponseBufferSize(), true);
67          try
68          {
69              while (true)
70              {
71                  int read = endPoint.fill(buffer);
72                  LOG.debug("Read {} bytes from {}", read, connection);
73                  if (read > 0)
74                  {
75                      parse(buffer);
76                  }
77                  else if (read == 0)
78                  {
79                      fillInterested();
80                      break;
81                  }
82                  else
83                  {
84                      shutdown();
85                      break;
86                  }
87              }
88          }
89          catch (EofException x)
90          {
91              LOG.ignore(x);
92              failAndClose(x);
93          }
94          catch (Exception x)
95          {
96              LOG.debug(x);
97              failAndClose(x);
98          }
99          finally
100         {
101             bufferPool.release(buffer);
102         }
103     }
104 
105     private void parse(ByteBuffer buffer)
106     {
107         while (buffer.hasRemaining())
108             parser.parseNext(buffer);
109     }
110 
111     private void fillInterested()
112     {
113         State state = this.state.get();
114         if (state == State.IDLE || state == State.RECEIVE)
115             connection.fillInterested();
116     }
117 
118     private void shutdown()
119     {
120         // Shutting down the parser may invoke messageComplete() or fail()
121         parser.shutdownInput();
122         State state = this.state.get();
123         if (state == State.IDLE || state == State.RECEIVE)
124         {
125             if (!fail(new EOFException()))
126                 connection.close();
127         }
128     }
129 
130     @Override
131     public boolean startResponse(HttpVersion version, int status, String reason)
132     {
133         if (updateState(State.IDLE, State.RECEIVE))
134         {
135             HttpExchange exchange = connection.getExchange();
136             // The exchange may be null if it failed concurrently
137             if (exchange != null)
138             {
139                 HttpConversation conversation = exchange.getConversation();
140                 HttpResponse response = exchange.getResponse();
141 
142                 parser.setHeadResponse(exchange.getRequest().getMethod() == HttpMethod.HEAD);
143                 response.version(version).status(status).reason(reason);
144 
145                 // Probe the protocol handlers
146                 HttpClient client = connection.getHttpClient();
147                 ProtocolHandler protocolHandler = client.findProtocolHandler(exchange.getRequest(), response);
148                 Response.Listener handlerListener = null;
149                 if (protocolHandler != null)
150                 {
151                     handlerListener = protocolHandler.getResponseListener();
152                     LOG.debug("Found protocol handler {}", protocolHandler);
153                 }
154                 exchange.getConversation().updateResponseListeners(handlerListener);
155 
156                 LOG.debug("Receiving {}", response);
157                 ResponseNotifier notifier = connection.getDestination().getResponseNotifier();
158                 notifier.notifyBegin(conversation.getResponseListeners(), response);
159             }
160         }
161         return false;
162     }
163 
164     @Override
165     public boolean parsedHeader(HttpField field)
166     {
167         if (updateState(State.RECEIVE, State.RECEIVE))
168         {
169             HttpExchange exchange = connection.getExchange();
170             // The exchange may be null if it failed concurrently
171             if (exchange != null)
172             {
173                 HttpConversation conversation = exchange.getConversation();
174                 HttpResponse response = exchange.getResponse();
175                 ResponseNotifier notifier = connection.getDestination().getResponseNotifier();
176                 boolean process = notifier.notifyHeader(conversation.getResponseListeners(), response, field);
177                 if (process)
178                 {
179                     response.getHeaders().add(field);
180                     HttpHeader fieldHeader = field.getHeader();
181                     if (fieldHeader != null)
182                     {
183                         switch (fieldHeader)
184                         {
185                             case SET_COOKIE:
186                             case SET_COOKIE2:
187                             {
188                                 storeCookie(exchange.getRequest().getURI(), field);
189                                 break;
190                             }
191                             default:
192                             {
193                                 break;
194                             }
195                         }
196                     }
197                 }
198             }
199         }
200         return false;
201     }
202 
203     private void storeCookie(URI uri, HttpField field)
204     {
205         try
206         {
207             Map<String, List<String>> header = new HashMap<>(1);
208             header.put(field.getHeader().asString(), Collections.singletonList(field.getValue()));
209             connection.getHttpClient().getCookieManager().put(uri, header);
210         }
211         catch (IOException x)
212         {
213             LOG.debug(x);
214         }
215     }
216 
217     @Override
218     public boolean headerComplete()
219     {
220         if (updateState(State.RECEIVE, State.RECEIVE))
221         {
222             HttpExchange exchange = connection.getExchange();
223             // The exchange may be null if it failed concurrently
224             if (exchange != null)
225             {
226                 HttpConversation conversation = exchange.getConversation();
227                 HttpResponse response = exchange.getResponse();
228                 LOG.debug("Headers {}", response);
229                 ResponseNotifier notifier = connection.getDestination().getResponseNotifier();
230                 notifier.notifyHeaders(conversation.getResponseListeners(), response);
231 
232                 Enumeration<String> contentEncodings = response.getHeaders().getValues(HttpHeader.CONTENT_ENCODING.asString(), ",");
233                 if (contentEncodings != null)
234                 {
235                     for (ContentDecoder.Factory factory : connection.getHttpClient().getContentDecoderFactories())
236                     {
237                         while (contentEncodings.hasMoreElements())
238                         {
239                             if (factory.getEncoding().equalsIgnoreCase(contentEncodings.nextElement()))
240                             {
241                                 this.decoder = factory.newContentDecoder();
242                                 break;
243                             }
244                         }
245                     }
246                 }
247             }
248         }
249         return false;
250     }
251 
252     @Override
253     public boolean content(ByteBuffer buffer)
254     {
255         if (updateState(State.RECEIVE, State.RECEIVE))
256         {
257             HttpExchange exchange = connection.getExchange();
258             // The exchange may be null if it failed concurrently
259             if (exchange != null)
260             {
261                 HttpConversation conversation = exchange.getConversation();
262                 HttpResponse response = exchange.getResponse();
263                 LOG.debug("Content {}: {} bytes", response, buffer.remaining());
264 
265                 ContentDecoder decoder = this.decoder;
266                 if (decoder != null)
267                 {
268                     buffer = decoder.decode(buffer);
269                     LOG.debug("{} {}: {} bytes", decoder, response, buffer.remaining());
270                 }
271 
272                 ResponseNotifier notifier = connection.getDestination().getResponseNotifier();
273                 notifier.notifyContent(conversation.getResponseListeners(), response, buffer);
274             }
275         }
276         return false;
277     }
278 
279     @Override
280     public boolean messageComplete()
281     {
282         if (updateState(State.RECEIVE, State.RECEIVE))
283             success();
284         return true;
285     }
286 
287     protected boolean success()
288     {
289         HttpExchange exchange = connection.getExchange();
290         if (exchange == null)
291             return false;
292 
293         AtomicMarkableReference<Result> completion = exchange.responseComplete(null);
294         if (!completion.isMarked())
295             return false;
296 
297         parser.reset();
298         decoder = null;
299 
300         if (!updateState(State.RECEIVE, State.IDLE))
301             throw new IllegalStateException();
302 
303         exchange.terminateResponse();
304 
305         HttpResponse response = exchange.getResponse();
306         List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
307         ResponseNotifier notifier = connection.getDestination().getResponseNotifier();
308         notifier.notifySuccess(listeners, response);
309         LOG.debug("Received {}", response);
310 
311         Result result = completion.getReference();
312         if (result != null)
313         {
314             connection.complete(exchange, !result.isFailed());
315             notifier.notifyComplete(listeners, result);
316         }
317 
318         return true;
319     }
320 
321     protected boolean fail(Throwable failure)
322     {
323         HttpExchange exchange = connection.getExchange();
324         // In case of a response error, the failure has already been notified
325         // and it is possible that a further attempt to read in the receive
326         // loop throws an exception that reenters here but without exchange;
327         // or, the server could just have timed out the connection.
328         if (exchange == null)
329             return false;
330 
331         AtomicMarkableReference<Result> completion = exchange.responseComplete(failure);
332         if (!completion.isMarked())
333             return false;
334 
335         parser.close();
336         decoder = null;
337 
338         while (true)
339         {
340             State current = state.get();
341             if (updateState(current, State.FAILURE))
342                 break;
343         }
344 
345         exchange.terminateResponse();
346 
347         HttpResponse response = exchange.getResponse();
348         HttpConversation conversation = exchange.getConversation();
349         ResponseNotifier notifier = connection.getDestination().getResponseNotifier();
350         notifier.notifyFailure(conversation.getResponseListeners(), response, failure);
351         LOG.debug("Failed {} {}", response, failure);
352 
353         Result result = completion.getReference();
354         if (result != null)
355         {
356             connection.complete(exchange, false);
357 
358             notifier.notifyComplete(conversation.getResponseListeners(), result);
359         }
360 
361         return true;
362     }
363 
364     @Override
365     public boolean earlyEOF()
366     {
367         failAndClose(new EOFException());
368         return false;
369     }
370 
371     private void failAndClose(Throwable failure)
372     {
373         fail(failure);
374         connection.close();
375     }
376 
377     @Override
378     public void badMessage(int status, String reason)
379     {
380         HttpExchange exchange = connection.getExchange();
381         HttpResponse response = exchange.getResponse();
382         response.status(status).reason(reason);
383         failAndClose(new HttpResponseException("HTTP protocol violation: bad response", response));
384     }
385 
386     public void idleTimeout()
387     {
388         // If we cannot fail, it means a response arrived
389         // just when we were timeout idling, so we don't close
390         fail(new TimeoutException());
391     }
392 
393     public boolean abort(Throwable cause)
394     {
395         return fail(cause);
396     }
397 
398     private boolean updateState(State from, State to)
399     {
400         boolean updated = state.compareAndSet(from, to);
401         if (!updated)
402             LOG.debug("State update failed: {} -> {}: {}", from, to, state.get());
403         return updated;
404     }
405 
406     private enum State
407     {
408         IDLE, RECEIVE, FAILURE
409     }
410 }