1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.client.http;
20
21 import java.io.EOFException;
22 import java.nio.ByteBuffer;
23
24 import org.eclipse.jetty.client.HttpClient;
25 import org.eclipse.jetty.client.HttpExchange;
26 import org.eclipse.jetty.client.HttpReceiver;
27 import org.eclipse.jetty.client.HttpResponse;
28 import org.eclipse.jetty.client.HttpResponseException;
29 import org.eclipse.jetty.http.HttpField;
30 import org.eclipse.jetty.http.HttpMethod;
31 import org.eclipse.jetty.http.HttpParser;
32 import org.eclipse.jetty.http.HttpVersion;
33 import org.eclipse.jetty.io.ByteBufferPool;
34 import org.eclipse.jetty.io.EndPoint;
35 import org.eclipse.jetty.util.BufferUtil;
36 import org.eclipse.jetty.util.CompletableCallback;
37
38 public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
39 {
40 private final HttpParser parser = new HttpParser(this);
41 private ByteBuffer buffer;
42 private boolean shutdown;
43
44 public HttpReceiverOverHTTP(HttpChannelOverHTTP channel)
45 {
46 super(channel);
47 }
48
49 @Override
50 public HttpChannelOverHTTP getHttpChannel()
51 {
52 return (HttpChannelOverHTTP)super.getHttpChannel();
53 }
54
55 private HttpConnectionOverHTTP getHttpConnection()
56 {
57 return getHttpChannel().getHttpConnection();
58 }
59
60 public void receive()
61 {
62 HttpClient client = getHttpDestination().getHttpClient();
63 ByteBufferPool bufferPool = client.getByteBufferPool();
64 buffer = bufferPool.acquire(client.getResponseBufferSize(), true);
65 process();
66 }
67
68 private void process()
69 {
70 if (readAndParse())
71 {
72 HttpClient client = getHttpDestination().getHttpClient();
73 ByteBufferPool bufferPool = client.getByteBufferPool();
74 bufferPool.release(buffer);
75
76 buffer = null;
77 }
78 }
79
80 private boolean readAndParse()
81 {
82 HttpConnectionOverHTTP connection = getHttpConnection();
83 EndPoint endPoint = connection.getEndPoint();
84 ByteBuffer buffer = this.buffer;
85 while (true)
86 {
87 try
88 {
89
90 if (connection.isClosed())
91 {
92 if (LOG.isDebugEnabled())
93 LOG.debug("{} closed", connection);
94 return true;
95 }
96
97 if (!parse(buffer))
98 return false;
99
100 int read = endPoint.fill(buffer);
101
102 if (LOG.isDebugEnabled())
103 LOG.debug("Read {} bytes from {}", read, endPoint);
104
105 if (read > 0)
106 {
107 if (!parse(buffer))
108 return false;
109 }
110 else if (read == 0)
111 {
112 fillInterested();
113 return true;
114 }
115 else
116 {
117 shutdown();
118 return true;
119 }
120 }
121 catch (Throwable x)
122 {
123 if (LOG.isDebugEnabled())
124 LOG.debug(x);
125 failAndClose(x);
126 return true;
127 }
128 }
129 }
130
131
132
133
134
135
136
137
138 private boolean parse(ByteBuffer buffer)
139 {
140
141
142 if (parser.parseNext(buffer))
143 {
144
145
146
147 return parser.isStart();
148 }
149 return true;
150 }
151
152 private void fillInterested()
153 {
154 getHttpChannel().getHttpConnection().fillInterested();
155 }
156
157 private void shutdown()
158 {
159
160
161
162
163 shutdown = true;
164
165
166
167
168
169 parser.atEOF();
170 parser.parseNext(BufferUtil.EMPTY_BUFFER);
171 }
172
173 protected boolean isShutdown()
174 {
175 return shutdown;
176 }
177
178 @Override
179 public int getHeaderCacheSize()
180 {
181
182 return 256;
183 }
184
185 @Override
186 public boolean startResponse(HttpVersion version, int status, String reason)
187 {
188 HttpExchange exchange = getHttpExchange();
189 if (exchange == null)
190 return false;
191
192 String method = exchange.getRequest().getMethod();
193 parser.setHeadResponse(HttpMethod.HEAD.is(method) || HttpMethod.CONNECT.is(method));
194 exchange.getResponse().version(version).status(status).reason(reason);
195
196 responseBegin(exchange);
197 return false;
198 }
199
200 @Override
201 public boolean parsedHeader(HttpField field)
202 {
203 HttpExchange exchange = getHttpExchange();
204 if (exchange == null)
205 return false;
206
207 responseHeader(exchange, field);
208 return false;
209 }
210
211 @Override
212 public boolean headerComplete()
213 {
214 HttpExchange exchange = getHttpExchange();
215 if (exchange == null)
216 return false;
217
218 responseHeaders(exchange);
219 return false;
220 }
221
222 @Override
223 public boolean content(ByteBuffer buffer)
224 {
225 HttpExchange exchange = getHttpExchange();
226 if (exchange == null)
227 return false;
228
229 CompletableCallback callback = new CompletableCallback()
230 {
231 @Override
232 public void resume()
233 {
234 if (LOG.isDebugEnabled())
235 LOG.debug("Content consumed asynchronously, resuming processing");
236 process();
237 }
238
239 public void abort(Throwable x)
240 {
241 failAndClose(x);
242 }
243 };
244 responseContent(exchange, buffer, callback);
245 return callback.tryComplete();
246 }
247
248 @Override
249 public boolean messageComplete()
250 {
251 HttpExchange exchange = getHttpExchange();
252 if (exchange == null)
253 return false;
254
255 responseSuccess(exchange);
256 return true;
257 }
258
259 @Override
260 public void earlyEOF()
261 {
262 HttpExchange exchange = getHttpExchange();
263 HttpConnectionOverHTTP connection = getHttpConnection();
264 if (exchange == null)
265 connection.close();
266 else
267 failAndClose(new EOFException(String.valueOf(connection)));
268 }
269
270 @Override
271 public void badMessage(int status, String reason)
272 {
273 HttpExchange exchange = getHttpExchange();
274 if (exchange != null)
275 {
276 HttpResponse response = exchange.getResponse();
277 response.status(status).reason(reason);
278 failAndClose(new HttpResponseException("HTTP protocol violation: bad response on " + getHttpConnection(), response));
279 }
280 }
281
282 @Override
283 protected void reset()
284 {
285 super.reset();
286 parser.reset();
287 }
288
289 @Override
290 protected void dispose()
291 {
292 super.dispose();
293 parser.close();
294 }
295
296 private void failAndClose(Throwable failure)
297 {
298 if (responseFailure(failure))
299 getHttpConnection().close(failure);
300 }
301
302 @Override
303 public String toString()
304 {
305 return String.format("%s@%x on %s", getClass().getSimpleName(), hashCode(), getHttpConnection());
306 }
307 }