View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client.http;
20  
21  import java.io.EOFException;
22  import java.nio.ByteBuffer;
23  
24  import org.eclipse.jetty.client.HttpClient;
25  import org.eclipse.jetty.client.HttpExchange;
26  import org.eclipse.jetty.client.HttpReceiver;
27  import org.eclipse.jetty.client.HttpResponse;
28  import org.eclipse.jetty.client.HttpResponseException;
29  import org.eclipse.jetty.http.HttpField;
30  import org.eclipse.jetty.http.HttpMethod;
31  import org.eclipse.jetty.http.HttpParser;
32  import org.eclipse.jetty.http.HttpStatus;
33  import org.eclipse.jetty.http.HttpVersion;
34  import org.eclipse.jetty.io.ByteBufferPool;
35  import org.eclipse.jetty.io.EndPoint;
36  import org.eclipse.jetty.util.BufferUtil;
37  import org.eclipse.jetty.util.CompletableCallback;
38  
39  public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.ResponseHandler
40  {
41      private final HttpParser parser = new HttpParser(this);
42      private ByteBuffer buffer;
43      private boolean shutdown;
44  
45      public HttpReceiverOverHTTP(HttpChannelOverHTTP channel)
46      {
47          super(channel);
48      }
49  
50      @Override
51      public HttpChannelOverHTTP getHttpChannel()
52      {
53          return (HttpChannelOverHTTP)super.getHttpChannel();
54      }
55  
56      private HttpConnectionOverHTTP getHttpConnection()
57      {
58          return getHttpChannel().getHttpConnection();
59      }
60  
61      protected ByteBuffer getResponseBuffer()
62      {
63          return buffer;
64      }
65  
66      public void receive()
67      {
68          if (buffer == null)
69              acquireBuffer();
70          process();
71      }
72  
73      private void acquireBuffer()
74      {
75          HttpClient client = getHttpDestination().getHttpClient();
76          ByteBufferPool bufferPool = client.getByteBufferPool();
77          buffer = bufferPool.acquire(client.getResponseBufferSize(), true);
78      }
79  
80      private void releaseBuffer()
81      {
82          if (buffer == null)
83              throw new IllegalStateException();
84          if (BufferUtil.hasContent(buffer))
85              throw new IllegalStateException();
86          HttpClient client = getHttpDestination().getHttpClient();
87          ByteBufferPool bufferPool = client.getByteBufferPool();
88          bufferPool.release(buffer);
89          buffer = null;
90      }
91  
92      private void process()
93      {
94          try
95          {
96              HttpConnectionOverHTTP connection = getHttpConnection();
97              EndPoint endPoint = connection.getEndPoint();
98              while (true)
99              {
100                 boolean upgraded = connection != endPoint.getConnection();
101 
102                 // Connection may be closed or upgraded in a parser callback.
103                 if (connection.isClosed() || upgraded)
104                 {
105                     if (LOG.isDebugEnabled())
106                         LOG.debug("{} {}", connection, upgraded ? "upgraded" : "closed");
107                     releaseBuffer();
108                     return;
109                 }
110 
111                 if (parse())
112                     return;
113 
114                 int read = endPoint.fill(buffer);
115                 if (LOG.isDebugEnabled())
116                     LOG.debug("Read {} bytes {} from {}", read, BufferUtil.toDetailString(buffer), endPoint);
117 
118                 if (read > 0)
119                 {
120                     if (parse())
121                         return;
122                 }
123                 else if (read == 0)
124                 {
125                     releaseBuffer();
126                     fillInterested();
127                     return;
128                 }
129                 else
130                 {
131                     releaseBuffer();
132                     shutdown();
133                     return;
134                 }
135             }
136         }
137         catch (Throwable x)
138         {
139             if (LOG.isDebugEnabled())
140                 LOG.debug(x);
141             BufferUtil.clear(buffer);
142             if (buffer != null)
143                 releaseBuffer();
144             failAndClose(x);
145         }
146     }
147 
148     /**
149      * Parses a HTTP response in the receivers buffer.
150      *
151      * @return true to indicate that parsing should be interrupted (and will be resumed by another thread).
152      */
153     private boolean parse()
154     {
155         while (true)
156         {
157             // Must parse even if the buffer is fully consumed, to allow the
158             // parser to advance from asynchronous content to response complete.
159             boolean handle = parser.parseNext(buffer);
160             if (LOG.isDebugEnabled())
161                 LOG.debug("Parsed {}, remaining {} {}", handle, buffer.remaining(), parser);
162             if (handle || !buffer.hasRemaining())
163                 return handle;
164         }
165     }
166 
167     protected void fillInterested()
168     {
169         getHttpConnection().fillInterested();
170     }
171 
172     private void shutdown()
173     {
174         // Mark this receiver as shutdown, so that we can
175         // close the connection when the exchange terminates.
176         // We cannot close the connection from here because
177         // the request may still be in process.
178         shutdown = true;
179 
180         // Shutting down the parser may invoke messageComplete() or earlyEOF().
181         // In case of content delimited by EOF, without a Connection: close
182         // header, the connection will be closed at exchange termination
183         // thanks to the flag we have set above.
184         parser.atEOF();
185         parser.parseNext(BufferUtil.EMPTY_BUFFER);
186     }
187 
188     protected boolean isShutdown()
189     {
190         return shutdown;
191     }
192 
193     @Override
194     public int getHeaderCacheSize()
195     {
196         // TODO get from configuration
197         return 256;
198     }
199 
200     @Override
201     public boolean startResponse(HttpVersion version, int status, String reason)
202     {
203         HttpExchange exchange = getHttpExchange();
204         if (exchange == null)
205             return false;
206 
207         String method = exchange.getRequest().getMethod();
208         parser.setHeadResponse(HttpMethod.HEAD.is(method) || HttpMethod.CONNECT.is(method));
209         exchange.getResponse().version(version).status(status).reason(reason);
210 
211         return !responseBegin(exchange);
212     }
213 
214     @Override
215     public void parsedHeader(HttpField field)
216     {
217         HttpExchange exchange = getHttpExchange();
218         if (exchange == null)
219             return;
220 
221         responseHeader(exchange, field);
222     }
223 
224     @Override
225     public boolean headerComplete()
226     {
227         HttpExchange exchange = getHttpExchange();
228         if (exchange == null)
229             return false;
230 
231         return !responseHeaders(exchange);
232     }
233 
234     @Override
235     public boolean content(ByteBuffer buffer)
236     {
237         HttpExchange exchange = getHttpExchange();
238         if (exchange == null)
239             return false;
240 
241         CompletableCallback callback = new CompletableCallback()
242         {
243             @Override
244             public void resume()
245             {
246                 if (LOG.isDebugEnabled())
247                     LOG.debug("Content consumed asynchronously, resuming processing");
248                 process();
249             }
250 
251             public void abort(Throwable x)
252             {
253                 failAndClose(x);
254             }
255         };
256         // Do not short circuit these calls.
257         boolean proceed = responseContent(exchange, buffer, callback);
258         boolean async = callback.tryComplete();
259         return !proceed || async;
260     }
261 
262     @Override
263     public boolean messageComplete()
264     {
265         HttpExchange exchange = getHttpExchange();
266         if (exchange == null)
267             return false;
268 
269         boolean proceed = responseSuccess(exchange);
270         if (!proceed)
271             return true;
272 
273         int status = exchange.getResponse().getStatus();
274         if (status == HttpStatus.SWITCHING_PROTOCOLS_101)
275             return true;
276 
277         if (HttpMethod.CONNECT.is(exchange.getRequest().getMethod()) &&
278                 status == HttpStatus.OK_200)
279             return true;
280 
281         return false;
282     }
283 
284     @Override
285     public void earlyEOF()
286     {
287         HttpExchange exchange = getHttpExchange();
288         HttpConnectionOverHTTP connection = getHttpConnection();
289         if (exchange == null)
290             connection.close();
291         else
292             failAndClose(new EOFException(String.valueOf(connection)));
293     }
294 
295     @Override
296     public void badMessage(int status, String reason)
297     {
298         HttpExchange exchange = getHttpExchange();
299         if (exchange != null)
300         {
301             HttpResponse response = exchange.getResponse();
302             response.status(status).reason(reason);
303             failAndClose(new HttpResponseException("HTTP protocol violation: bad response on " + getHttpConnection(), response));
304         }
305     }
306 
307     @Override
308     protected void reset()
309     {
310         super.reset();
311         parser.reset();
312     }
313 
314     @Override
315     protected void dispose()
316     {
317         super.dispose();
318         parser.close();
319     }
320 
321     private void failAndClose(Throwable failure)
322     {
323         if (responseFailure(failure))
324             getHttpConnection().close(failure);
325     }
326 
327     @Override
328     public String toString()
329     {
330         return String.format("%s[%s]", super.toString(), parser);
331     }
332 }