View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client.http;
20  
21  import java.nio.ByteBuffer;
22  
23  import org.eclipse.jetty.client.HttpClient;
24  import org.eclipse.jetty.client.HttpContent;
25  import org.eclipse.jetty.client.HttpExchange;
26  import org.eclipse.jetty.client.HttpRequestException;
27  import org.eclipse.jetty.client.HttpSender;
28  import org.eclipse.jetty.client.api.ContentProvider;
29  import org.eclipse.jetty.client.api.Request;
30  import org.eclipse.jetty.http.HttpGenerator;
31  import org.eclipse.jetty.http.HttpURI;
32  import org.eclipse.jetty.http.MetaData;
33  import org.eclipse.jetty.io.ByteBufferPool;
34  import org.eclipse.jetty.io.EndPoint;
35  import org.eclipse.jetty.util.Callback;
36  import org.eclipse.jetty.util.IteratingCallback;
37  
38  public class HttpSenderOverHTTP extends HttpSender
39  {
40      private final HttpGenerator generator = new HttpGenerator();
41  
42      public HttpSenderOverHTTP(HttpChannelOverHTTP channel)
43      {
44          super(channel);
45      }
46  
47      @Override
48      public HttpChannelOverHTTP getHttpChannel()
49      {
50          return (HttpChannelOverHTTP)super.getHttpChannel();
51      }
52  
53      @Override
54      protected void sendHeaders(HttpExchange exchange, HttpContent content, Callback callback)
55      {
56          try
57          {
58              new HeadersCallback(exchange, content, callback).iterate();
59          }
60          catch (Throwable x)
61          {
62              if (LOG.isDebugEnabled())
63                  LOG.debug(x);
64              callback.failed(x);
65          }
66      }
67  
68      @Override
69      protected void sendContent(HttpExchange exchange, HttpContent content, Callback callback)
70      {
71          try
72          {
73              HttpClient client = getHttpChannel().getHttpDestination().getHttpClient();
74              ByteBufferPool bufferPool = client.getByteBufferPool();
75              ByteBuffer chunk = null;
76              while (true)
77              {
78                  ByteBuffer contentBuffer = content.getByteBuffer();
79                  boolean lastContent = content.isLast();
80                  HttpGenerator.Result result = generator.generateRequest(null, null, chunk, contentBuffer, lastContent);
81                  if (LOG.isDebugEnabled())
82                      LOG.debug("Generated content ({} bytes) - {}/{}",
83                              contentBuffer == null ? -1 : contentBuffer.remaining(),
84                              result, generator);
85                  switch (result)
86                  {
87                      case NEED_CHUNK:
88                      {
89                          chunk = bufferPool.acquire(HttpGenerator.CHUNK_SIZE, false);
90                          break;
91                      }
92                      case FLUSH:
93                      {
94                          EndPoint endPoint = getHttpChannel().getHttpConnection().getEndPoint();
95                          if (chunk != null)
96                              endPoint.write(new ByteBufferRecyclerCallback(callback, bufferPool, chunk), chunk, contentBuffer);
97                          else
98                              endPoint.write(callback, contentBuffer);
99                          return;
100                     }
101                     case SHUTDOWN_OUT:
102                     {
103                         shutdownOutput();
104                         break;
105                     }
106                     case CONTINUE:
107                     {
108                         if (lastContent)
109                             break;
110                         callback.succeeded();
111                         return;
112                     }
113                     case DONE:
114                     {
115                         callback.succeeded();
116                         return;
117                     }
118                     default:
119                     {
120                         throw new IllegalStateException(result.toString());
121                     }
122                 }
123             }
124         }
125         catch (Throwable x)
126         {
127             if (LOG.isDebugEnabled())
128                 LOG.debug(x);
129             callback.failed(x);
130         }
131     }
132 
133     @Override
134     protected void reset()
135     {
136         generator.reset();
137         super.reset();
138     }
139 
140     @Override
141     protected void dispose()
142     {
143         generator.abort();
144         super.dispose();
145         shutdownOutput();
146     }
147 
148     private void shutdownOutput()
149     {
150         if (LOG.isDebugEnabled())
151             LOG.debug("Request shutdown output {}", getHttpExchange().getRequest());
152         getHttpChannel().getHttpConnection().getEndPoint().shutdownOutput();
153     }
154 
155     @Override
156     public String toString()
157     {
158         return String.format("%s[%s]", super.toString(), generator);
159     }
160 
161     private class HeadersCallback extends IteratingCallback
162     {
163         private final HttpExchange exchange;
164         private final Callback callback;
165         private final MetaData.Request metaData;
166         private ByteBuffer headerBuffer;
167         private ByteBuffer chunkBuffer;
168         private ByteBuffer contentBuffer;
169         private boolean lastContent;
170         private boolean generated;
171 
172         public HeadersCallback(HttpExchange exchange, HttpContent content, Callback callback)
173         {
174             super(false);
175             this.exchange = exchange;
176             this.callback = callback;
177 
178             Request request = exchange.getRequest();
179             ContentProvider requestContent = request.getContent();
180             long contentLength = requestContent == null ? -1 : requestContent.getLength();
181             String path = request.getPath();
182             String query = request.getQuery();
183             if (query != null)
184                 path += "?" + query;
185             metaData = new MetaData.Request(request.getMethod(), new HttpURI(path), request.getVersion(), request.getHeaders(), contentLength);
186 
187             if (!expects100Continue(request))
188             {
189                 content.advance();
190                 contentBuffer = content.getByteBuffer();
191                 lastContent = content.isLast();
192             }
193         }
194 
195         @Override
196         protected Action process() throws Exception
197         {
198             HttpClient client = getHttpChannel().getHttpDestination().getHttpClient();
199             ByteBufferPool bufferPool = client.getByteBufferPool();
200 
201             while (true)
202             {
203                 HttpGenerator.Result result = generator.generateRequest(metaData, headerBuffer, chunkBuffer, contentBuffer, lastContent);
204                 if (LOG.isDebugEnabled())
205                     LOG.debug("Generated headers ({} bytes), chunk ({} bytes), content ({} bytes) - {}/{}",
206                             headerBuffer == null ? -1 : headerBuffer.remaining(),
207                             chunkBuffer == null ? -1 : chunkBuffer.remaining(),
208                             contentBuffer == null ? -1 : contentBuffer.remaining(),
209                             result, generator);
210                 switch (result)
211                 {
212                     case NEED_HEADER:
213                     {
214                         headerBuffer = bufferPool.acquire(client.getRequestBufferSize(), false);
215                         break;
216                     }
217                     case NEED_CHUNK:
218                     {
219                         chunkBuffer = bufferPool.acquire(HttpGenerator.CHUNK_SIZE, false);
220                         break;
221                     }
222                     case FLUSH:
223                     {
224                         EndPoint endPoint = getHttpChannel().getHttpConnection().getEndPoint();
225                         if (chunkBuffer == null)
226                         {
227                             if (contentBuffer == null)
228                                 endPoint.write(this, headerBuffer);
229                             else
230                                 endPoint.write(this, headerBuffer, contentBuffer);
231                         }
232                         else
233                         {
234                             if (contentBuffer == null)
235                                 endPoint.write(this, headerBuffer, chunkBuffer);
236                             else
237                                 endPoint.write(this, headerBuffer, chunkBuffer, contentBuffer);
238                         }
239                         generated = true;
240                         return Action.SCHEDULED;
241                     }
242                     case SHUTDOWN_OUT:
243                     {
244                         shutdownOutput();
245                         return Action.SUCCEEDED;
246                     }
247                     case CONTINUE:
248                     {
249                         if (generated)
250                             return Action.SUCCEEDED;
251                         break;
252                     }
253                     case DONE:
254                     {
255                         if (generated)
256                             return Action.SUCCEEDED;
257                         // The headers have already been generated by some
258                         // other thread, perhaps by a concurrent abort().
259                         throw new HttpRequestException("Could not generate headers", exchange.getRequest());
260                     }
261                     default:
262                     {
263                         throw new IllegalStateException(result.toString());
264                     }
265                 }
266             }
267         }
268 
269         @Override
270         public void succeeded()
271         {
272             release();
273             super.succeeded();
274         }
275 
276         @Override
277         public void failed(Throwable x)
278         {
279             release();
280             callback.failed(x);
281             super.failed(x);
282         }
283 
284         @Override
285         protected void onCompleteSuccess()
286         {
287             super.onCompleteSuccess();
288             callback.succeeded();
289         }
290 
291         private void release()
292         {
293             HttpClient client = getHttpChannel().getHttpDestination().getHttpClient();
294             ByteBufferPool bufferPool = client.getByteBufferPool();
295             bufferPool.release(headerBuffer);
296             headerBuffer = null;
297             if (chunkBuffer != null)
298                 bufferPool.release(chunkBuffer);
299             chunkBuffer = null;
300         }
301     }
302 
303     private class ByteBufferRecyclerCallback implements Callback
304     {
305         private final Callback callback;
306         private final ByteBufferPool pool;
307         private final ByteBuffer[] buffers;
308 
309         private ByteBufferRecyclerCallback(Callback callback, ByteBufferPool pool, ByteBuffer... buffers)
310         {
311             this.callback = callback;
312             this.pool = pool;
313             this.buffers = buffers;
314         }
315 
316         @Override
317         public boolean isNonBlocking()
318         {
319             return callback.isNonBlocking();
320         }
321 
322         @Override
323         public void succeeded()
324         {
325             for (ByteBuffer buffer : buffers)
326             {
327                 assert !buffer.hasRemaining();
328                 pool.release(buffer);
329             }
330             callback.succeeded();
331         }
332 
333         @Override
334         public void failed(Throwable x)
335         {
336             for (ByteBuffer buffer : buffers)
337                 pool.release(buffer);
338             callback.failed(x);
339         }
340     }
341 }