View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.net.URI;
24  import java.nio.ByteBuffer;
25  import java.util.Collections;
26  import java.util.Enumeration;
27  import java.util.HashMap;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.concurrent.TimeoutException;
31  import java.util.concurrent.atomic.AtomicMarkableReference;
32  import java.util.concurrent.atomic.AtomicReference;
33  
34  import org.eclipse.jetty.client.api.Response;
35  import org.eclipse.jetty.client.api.Result;
36  import org.eclipse.jetty.http.HttpField;
37  import org.eclipse.jetty.http.HttpHeader;
38  import org.eclipse.jetty.http.HttpMethod;
39  import org.eclipse.jetty.http.HttpParser;
40  import org.eclipse.jetty.http.HttpVersion;
41  import org.eclipse.jetty.io.ByteBufferPool;
42  import org.eclipse.jetty.io.EndPoint;
43  import org.eclipse.jetty.io.EofException;
44  import org.eclipse.jetty.util.log.Log;
45  import org.eclipse.jetty.util.log.Logger;
46  
47  public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
48  {
49      private static final Logger LOG = Log.getLogger(HttpReceiver.class);
50  
51      private final AtomicReference<State> state = new AtomicReference<>(State.IDLE);
52      private final HttpParser parser = new HttpParser(this);
53      private final HttpConnection connection;
54      private ContentDecoder decoder;
55  
56      public HttpReceiver(HttpConnection connection)
57      {
58          this.connection = connection;
59      }
60  
61      public void receive()
62      {
63          EndPoint endPoint = connection.getEndPoint();
64          HttpClient client = connection.getHttpClient();
65          ByteBufferPool bufferPool = client.getByteBufferPool();
66          ByteBuffer buffer = bufferPool.acquire(client.getResponseBufferSize(), true);
67          try
68          {
69              while (true)
70              {
71                  // Connection may be closed in a parser callback
72                  if (connection.isClosed())
73                  {
74                      LOG.debug("{} closed", connection);
75                      break;
76                  }
77                  else
78                  {
79                      int read = endPoint.fill(buffer);
80                      LOG.debug("Read {} bytes from {}", read, connection);
81                      if (read > 0)
82                      {
83                          parse(buffer);
84                      }
85                      else if (read == 0)
86                      {
87                          fillInterested();
88                          break;
89                      }
90                      else
91                      {
92                          shutdown();
93                          break;
94                      }
95                  }
96              }
97          }
98          catch (EofException x)
99          {
100             LOG.ignore(x);
101             failAndClose(x);
102         }
103         catch (Exception x)
104         {
105             LOG.debug(x);
106             failAndClose(x);
107         }
108         finally
109         {
110             bufferPool.release(buffer);
111         }
112     }
113 
114     private void parse(ByteBuffer buffer)
115     {
116         while (buffer.hasRemaining())
117             parser.parseNext(buffer);
118     }
119 
120     private void fillInterested()
121     {
122         State state = this.state.get();
123         if (state == State.IDLE || state == State.RECEIVE)
124             connection.fillInterested();
125     }
126 
127     private void shutdown()
128     {
129         // Shutting down the parser may invoke messageComplete() or fail()
130         parser.shutdownInput();
131         State state = this.state.get();
132         if (state == State.IDLE || state == State.RECEIVE)
133         {
134             if (!fail(new EOFException()))
135                 connection.close();
136         }
137     }
138 
139 
140     @Override
141     public int getHeaderCacheSize()
142     {
143         // TODO get from configuration
144         return 256;
145     }
146 
147     @Override
148     public boolean startResponse(HttpVersion version, int status, String reason)
149     {
150         if (updateState(State.IDLE, State.RECEIVE))
151         {
152             HttpExchange exchange = connection.getExchange();
153             // The exchange may be null if it failed concurrently
154             if (exchange != null)
155             {
156                 HttpConversation conversation = exchange.getConversation();
157                 HttpResponse response = exchange.getResponse();
158 
159                 String method = exchange.getRequest().method();
160                 parser.setHeadResponse(HttpMethod.HEAD.is(method) || HttpMethod.CONNECT.is(method));
161                 response.version(version).status(status).reason(reason);
162 
163                 // Probe the protocol handlers
164                 HttpClient client = connection.getHttpClient();
165                 ProtocolHandler protocolHandler = client.findProtocolHandler(exchange.getRequest(), response);
166                 Response.Listener handlerListener = null;
167                 if (protocolHandler != null)
168                 {
169                     handlerListener = protocolHandler.getResponseListener();
170                     LOG.debug("Found protocol handler {}", protocolHandler);
171                 }
172                 exchange.getConversation().updateResponseListeners(handlerListener);
173 
174                 LOG.debug("Receiving {}", response);
175                 ResponseNotifier notifier = connection.getDestination().getResponseNotifier();
176                 notifier.notifyBegin(conversation.getResponseListeners(), response);
177             }
178         }
179         return false;
180     }
181 
182     @Override
183     public boolean parsedHeader(HttpField field)
184     {
185         if (updateState(State.RECEIVE, State.RECEIVE))
186         {
187             HttpExchange exchange = connection.getExchange();
188             // The exchange may be null if it failed concurrently
189             if (exchange != null)
190             {
191                 HttpConversation conversation = exchange.getConversation();
192                 HttpResponse response = exchange.getResponse();
193                 ResponseNotifier notifier = connection.getDestination().getResponseNotifier();
194                 boolean process = notifier.notifyHeader(conversation.getResponseListeners(), response, field);
195                 if (process)
196                 {
197                     response.getHeaders().add(field);
198                     HttpHeader fieldHeader = field.getHeader();
199                     if (fieldHeader != null)
200                     {
201                         switch (fieldHeader)
202                         {
203                             case SET_COOKIE:
204                             case SET_COOKIE2:
205                             {
206                                 storeCookie(exchange.getRequest().getURI(), field);
207                                 break;
208                             }
209                             default:
210                             {
211                                 break;
212                             }
213                         }
214                     }
215                 }
216             }
217         }
218         return false;
219     }
220 
221     private void storeCookie(URI uri, HttpField field)
222     {
223         try
224         {
225             String value = field.getValue();
226             if (value != null)
227             {
228                 Map<String, List<String>> header = new HashMap<>(1);
229                 header.put(field.getHeader().asString(), Collections.singletonList(value));
230                 connection.getHttpClient().getCookieManager().put(uri, header);
231             }
232         }
233         catch (IOException x)
234         {
235             LOG.debug(x);
236         }
237     }
238 
239     @Override
240     public boolean headerComplete()
241     {
242         if (updateState(State.RECEIVE, State.RECEIVE))
243         {
244             HttpExchange exchange = connection.getExchange();
245             // The exchange may be null if it failed concurrently
246             if (exchange != null)
247             {
248                 HttpConversation conversation = exchange.getConversation();
249                 HttpResponse response = exchange.getResponse();
250                 LOG.debug("Headers {}", response);
251                 ResponseNotifier notifier = connection.getDestination().getResponseNotifier();
252                 notifier.notifyHeaders(conversation.getResponseListeners(), response);
253 
254                 Enumeration<String> contentEncodings = response.getHeaders().getValues(HttpHeader.CONTENT_ENCODING.asString(), ",");
255                 if (contentEncodings != null)
256                 {
257                     for (ContentDecoder.Factory factory : connection.getHttpClient().getContentDecoderFactories())
258                     {
259                         while (contentEncodings.hasMoreElements())
260                         {
261                             if (factory.getEncoding().equalsIgnoreCase(contentEncodings.nextElement()))
262                             {
263                                 this.decoder = factory.newContentDecoder();
264                                 break;
265                             }
266                         }
267                     }
268                 }
269             }
270         }
271         return false;
272     }
273 
274     @Override
275     public boolean content(ByteBuffer buffer)
276     {
277         if (updateState(State.RECEIVE, State.RECEIVE))
278         {
279             HttpExchange exchange = connection.getExchange();
280             // The exchange may be null if it failed concurrently
281             if (exchange != null)
282             {
283                 HttpConversation conversation = exchange.getConversation();
284                 HttpResponse response = exchange.getResponse();
285                 LOG.debug("Content {}: {} bytes", response, buffer.remaining());
286 
287                 ContentDecoder decoder = this.decoder;
288                 if (decoder != null)
289                 {
290                     buffer = decoder.decode(buffer);
291                     LOG.debug("{} {}: {} bytes", decoder, response, buffer.remaining());
292                 }
293 
294                 ResponseNotifier notifier = connection.getDestination().getResponseNotifier();
295                 notifier.notifyContent(conversation.getResponseListeners(), response, buffer);
296             }
297         }
298         return false;
299     }
300 
301     @Override
302     public boolean messageComplete()
303     {
304         if (updateState(State.RECEIVE, State.RECEIVE))
305             success();
306         return true;
307     }
308 
309     protected boolean success()
310     {
311         HttpExchange exchange = connection.getExchange();
312         if (exchange == null)
313             return false;
314 
315         AtomicMarkableReference<Result> completion = exchange.responseComplete(null);
316         if (!completion.isMarked())
317             return false;
318 
319         parser.reset();
320         decoder = null;
321 
322         if (!updateState(State.RECEIVE, State.IDLE))
323             throw new IllegalStateException();
324 
325         exchange.terminateResponse();
326 
327         HttpResponse response = exchange.getResponse();
328         List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
329         ResponseNotifier notifier = connection.getDestination().getResponseNotifier();
330         notifier.notifySuccess(listeners, response);
331         LOG.debug("Received {}", response);
332 
333         Result result = completion.getReference();
334         if (result != null)
335         {
336             connection.complete(exchange, !result.isFailed());
337             notifier.notifyComplete(listeners, result);
338         }
339 
340         return true;
341     }
342 
343     protected boolean fail(Throwable failure)
344     {
345         HttpExchange exchange = connection.getExchange();
346         // In case of a response error, the failure has already been notified
347         // and it is possible that a further attempt to read in the receive
348         // loop throws an exception that reenters here but without exchange;
349         // or, the server could just have timed out the connection.
350         if (exchange == null)
351             return false;
352 
353         AtomicMarkableReference<Result> completion = exchange.responseComplete(failure);
354         if (!completion.isMarked())
355             return false;
356 
357         parser.close();
358         decoder = null;
359 
360         while (true)
361         {
362             State current = state.get();
363             if (updateState(current, State.FAILURE))
364                 break;
365         }
366 
367         exchange.terminateResponse();
368 
369         HttpResponse response = exchange.getResponse();
370         HttpConversation conversation = exchange.getConversation();
371         ResponseNotifier notifier = connection.getDestination().getResponseNotifier();
372         notifier.notifyFailure(conversation.getResponseListeners(), response, failure);
373         LOG.debug("Failed {} {}", response, failure);
374 
375         Result result = completion.getReference();
376         if (result != null)
377         {
378             connection.complete(exchange, false);
379 
380             notifier.notifyComplete(conversation.getResponseListeners(), result);
381         }
382 
383         return true;
384     }
385 
386     @Override
387     public void earlyEOF()
388     {
389         failAndClose(new EOFException());
390     }
391 
392     private void failAndClose(Throwable failure)
393     {
394         fail(failure);
395         connection.close();
396     }
397 
398     @Override
399     public void badMessage(int status, String reason)
400     {
401         HttpExchange exchange = connection.getExchange();
402         HttpResponse response = exchange.getResponse();
403         response.status(status).reason(reason);
404         failAndClose(new HttpResponseException("HTTP protocol violation: bad response", response));
405     }
406 
407     public void idleTimeout()
408     {
409         // If we cannot fail, it means a response arrived
410         // just when we were timeout idling, so we don't close
411         fail(new TimeoutException());
412     }
413 
414     public boolean abort(Throwable cause)
415     {
416         return fail(cause);
417     }
418 
419     private boolean updateState(State from, State to)
420     {
421         boolean updated = state.compareAndSet(from, to);
422         if (!updated)
423             LOG.debug("State update failed: {} -> {}: {}", from, to, state.get());
424         return updated;
425     }
426 
427     @Override
428     public String toString()
429     {
430         return String.format("%s@%x on %s", getClass().getSimpleName(), hashCode(), connection);
431     }
432 
433     private enum State
434     {
435         IDLE, RECEIVE, FAILURE
436     }
437 }