1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client.http;
20
21 import java.nio.ByteBuffer;
22
23 import org.eclipse.jetty.client.HttpClient;
24 import org.eclipse.jetty.client.HttpContent;
25 import org.eclipse.jetty.client.HttpExchange;
26 import org.eclipse.jetty.client.HttpRequestException;
27 import org.eclipse.jetty.client.HttpSender;
28 import org.eclipse.jetty.client.api.ContentProvider;
29 import org.eclipse.jetty.client.api.Request;
30 import org.eclipse.jetty.http.HttpGenerator;
31 import org.eclipse.jetty.http.HttpURI;
32 import org.eclipse.jetty.http.MetaData;
33 import org.eclipse.jetty.io.ByteBufferPool;
34 import org.eclipse.jetty.io.EndPoint;
35 import org.eclipse.jetty.util.Callback;
36 import org.eclipse.jetty.util.IteratingCallback;
37
38 public class HttpSenderOverHTTP extends HttpSender
39 {
40 private final HttpGenerator generator = new HttpGenerator();
41
42 public HttpSenderOverHTTP(HttpChannelOverHTTP channel)
43 {
44 super(channel);
45 }
46
47 @Override
48 public HttpChannelOverHTTP getHttpChannel()
49 {
50 return (HttpChannelOverHTTP)super.getHttpChannel();
51 }
52
53 @Override
54 protected void sendHeaders(HttpExchange exchange, HttpContent content, Callback callback)
55 {
56 try
57 {
58 new HeadersCallback(exchange, content, callback).iterate();
59 }
60 catch (Throwable x)
61 {
62 if (LOG.isDebugEnabled())
63 LOG.debug(x);
64 callback.failed(x);
65 }
66 }
67
68 @Override
69 protected void sendContent(HttpExchange exchange, HttpContent content, Callback callback)
70 {
71 try
72 {
73 HttpClient client = getHttpChannel().getHttpDestination().getHttpClient();
74 ByteBufferPool bufferPool = client.getByteBufferPool();
75 ByteBuffer chunk = null;
76 while (true)
77 {
78 ByteBuffer contentBuffer = content.getByteBuffer();
79 boolean lastContent = content.isLast();
80 HttpGenerator.Result result = generator.generateRequest(null, null, chunk, contentBuffer, lastContent);
81 if (LOG.isDebugEnabled())
82 LOG.debug("Generated content ({} bytes) - {}/{}",
83 contentBuffer == null ? -1 : contentBuffer.remaining(),
84 result, generator);
85 switch (result)
86 {
87 case NEED_CHUNK:
88 {
89 chunk = bufferPool.acquire(HttpGenerator.CHUNK_SIZE, false);
90 break;
91 }
92 case FLUSH:
93 {
94 EndPoint endPoint = getHttpChannel().getHttpConnection().getEndPoint();
95 if (chunk != null)
96 endPoint.write(new ByteBufferRecyclerCallback(callback, bufferPool, chunk), chunk, contentBuffer);
97 else
98 endPoint.write(callback, contentBuffer);
99 return;
100 }
101 case SHUTDOWN_OUT:
102 {
103 shutdownOutput();
104 break;
105 }
106 case CONTINUE:
107 {
108 if (lastContent)
109 break;
110 callback.succeeded();
111 return;
112 }
113 case DONE:
114 {
115 callback.succeeded();
116 return;
117 }
118 default:
119 {
120 throw new IllegalStateException(result.toString());
121 }
122 }
123 }
124 }
125 catch (Throwable x)
126 {
127 if (LOG.isDebugEnabled())
128 LOG.debug(x);
129 callback.failed(x);
130 }
131 }
132
133 @Override
134 protected void reset()
135 {
136 generator.reset();
137 super.reset();
138 }
139
140 @Override
141 protected void dispose()
142 {
143 generator.abort();
144 super.dispose();
145 shutdownOutput();
146 }
147
148 private void shutdownOutput()
149 {
150 if (LOG.isDebugEnabled())
151 LOG.debug("Request shutdown output {}", getHttpExchange().getRequest());
152 getHttpChannel().getHttpConnection().getEndPoint().shutdownOutput();
153 }
154
155 @Override
156 public String toString()
157 {
158 return String.format("%s[%s]", super.toString(), generator);
159 }
160
161 private class HeadersCallback extends IteratingCallback
162 {
163 private final HttpExchange exchange;
164 private final Callback callback;
165 private final MetaData.Request metaData;
166 private ByteBuffer headerBuffer;
167 private ByteBuffer chunkBuffer;
168 private ByteBuffer contentBuffer;
169 private boolean lastContent;
170 private boolean generated;
171
172 public HeadersCallback(HttpExchange exchange, HttpContent content, Callback callback)
173 {
174 super(false);
175 this.exchange = exchange;
176 this.callback = callback;
177
178 Request request = exchange.getRequest();
179 ContentProvider requestContent = request.getContent();
180 long contentLength = requestContent == null ? -1 : requestContent.getLength();
181 String path = request.getPath();
182 String query = request.getQuery();
183 if (query != null)
184 path += "?" + query;
185 metaData = new MetaData.Request(request.getMethod(), new HttpURI(path), request.getVersion(), request.getHeaders(), contentLength);
186
187 if (!expects100Continue(request))
188 {
189 content.advance();
190 contentBuffer = content.getByteBuffer();
191 lastContent = content.isLast();
192 }
193 }
194
195 @Override
196 protected Action process() throws Exception
197 {
198 HttpClient client = getHttpChannel().getHttpDestination().getHttpClient();
199 ByteBufferPool bufferPool = client.getByteBufferPool();
200
201 while (true)
202 {
203 HttpGenerator.Result result = generator.generateRequest(metaData, headerBuffer, chunkBuffer, contentBuffer, lastContent);
204 if (LOG.isDebugEnabled())
205 LOG.debug("Generated headers ({} bytes), chunk ({} bytes), content ({} bytes) - {}/{}",
206 headerBuffer == null ? -1 : headerBuffer.remaining(),
207 chunkBuffer == null ? -1 : chunkBuffer.remaining(),
208 contentBuffer == null ? -1 : contentBuffer.remaining(),
209 result, generator);
210 switch (result)
211 {
212 case NEED_HEADER:
213 {
214 headerBuffer = bufferPool.acquire(client.getRequestBufferSize(), false);
215 break;
216 }
217 case NEED_CHUNK:
218 {
219 chunkBuffer = bufferPool.acquire(HttpGenerator.CHUNK_SIZE, false);
220 break;
221 }
222 case FLUSH:
223 {
224 EndPoint endPoint = getHttpChannel().getHttpConnection().getEndPoint();
225 if (chunkBuffer == null)
226 {
227 if (contentBuffer == null)
228 endPoint.write(this, headerBuffer);
229 else
230 endPoint.write(this, headerBuffer, contentBuffer);
231 }
232 else
233 {
234 if (contentBuffer == null)
235 endPoint.write(this, headerBuffer, chunkBuffer);
236 else
237 endPoint.write(this, headerBuffer, chunkBuffer, contentBuffer);
238 }
239 generated = true;
240 return Action.SCHEDULED;
241 }
242 case SHUTDOWN_OUT:
243 {
244 shutdownOutput();
245 return Action.SUCCEEDED;
246 }
247 case CONTINUE:
248 {
249 if (generated)
250 return Action.SUCCEEDED;
251 break;
252 }
253 case DONE:
254 {
255 if (generated)
256 return Action.SUCCEEDED;
257
258
259 throw new HttpRequestException("Could not generate headers", exchange.getRequest());
260 }
261 default:
262 {
263 throw new IllegalStateException(result.toString());
264 }
265 }
266 }
267 }
268
269 @Override
270 public void succeeded()
271 {
272 release();
273 super.succeeded();
274 }
275
276 @Override
277 public void failed(Throwable x)
278 {
279 release();
280 callback.failed(x);
281 super.failed(x);
282 }
283
284 @Override
285 protected void onCompleteSuccess()
286 {
287 super.onCompleteSuccess();
288 callback.succeeded();
289 }
290
291 private void release()
292 {
293 HttpClient client = getHttpChannel().getHttpDestination().getHttpClient();
294 ByteBufferPool bufferPool = client.getByteBufferPool();
295 bufferPool.release(headerBuffer);
296 headerBuffer = null;
297 if (chunkBuffer != null)
298 bufferPool.release(chunkBuffer);
299 chunkBuffer = null;
300 }
301 }
302
303 private class ByteBufferRecyclerCallback implements Callback
304 {
305 private final Callback callback;
306 private final ByteBufferPool pool;
307 private final ByteBuffer[] buffers;
308
309 private ByteBufferRecyclerCallback(Callback callback, ByteBufferPool pool, ByteBuffer... buffers)
310 {
311 this.callback = callback;
312 this.pool = pool;
313 this.buffers = buffers;
314 }
315
316 @Override
317 public boolean isNonBlocking()
318 {
319 return callback.isNonBlocking();
320 }
321
322 @Override
323 public void succeeded()
324 {
325 for (ByteBuffer buffer : buffers)
326 {
327 assert !buffer.hasRemaining();
328 pool.release(buffer);
329 }
330 callback.succeeded();
331 }
332
333 @Override
334 public void failed(Throwable x)
335 {
336 for (ByteBuffer buffer : buffers)
337 pool.release(buffer);
338 callback.failed(x);
339 }
340 }
341 }