1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.client;
15
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.io.InterruptedIOException;
19
20 import org.eclipse.jetty.client.security.Authorization;
21 import org.eclipse.jetty.http.HttpFields;
22 import org.eclipse.jetty.http.HttpGenerator;
23 import org.eclipse.jetty.http.HttpHeaderValues;
24 import org.eclipse.jetty.http.HttpHeaders;
25 import org.eclipse.jetty.http.HttpParser;
26 import org.eclipse.jetty.http.HttpSchemes;
27 import org.eclipse.jetty.http.HttpVersions;
28 import org.eclipse.jetty.http.ssl.SslSelectChannelEndPoint;
29 import org.eclipse.jetty.io.Buffer;
30 import org.eclipse.jetty.io.Buffers;
31 import org.eclipse.jetty.io.ByteArrayBuffer;
32 import org.eclipse.jetty.io.Connection;
33 import org.eclipse.jetty.io.EndPoint;
34 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
35 import org.eclipse.jetty.util.log.Log;
36 import org.eclipse.jetty.util.thread.Timeout;
37
38
39
40
41
42 public class HttpConnection implements Connection
43 {
44 private HttpDestination _destination;
45 private EndPoint _endp;
46 private HttpGenerator _generator;
47 private HttpParser _parser;
48 private boolean _http11 = true;
49 private Buffer _connectionHeader;
50 private Buffer _requestContentChunk;
51 private long _last;
52 private boolean _requestComplete;
53 private boolean _reserved;
54
55 private volatile HttpExchange _exchange;
56 private HttpExchange _pipeline;
57 private final Timeout.Task _timeout = new TimeoutTask();
58
59 public void dump() throws IOException
60 {
61 Log.info("endp=" + _endp + " " + _endp.isBufferingInput() + " " + _endp.isBufferingOutput());
62 Log.info("generator=" + _generator);
63 Log.info("parser=" + _parser.getState() + " " + _parser.isMoreInBuffer());
64 Log.info("exchange=" + _exchange);
65 if (_endp instanceof SslSelectChannelEndPoint)
66 ((SslSelectChannelEndPoint)_endp).dump();
67 }
68
69 HttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp)
70 {
71 _endp = endp;
72 _generator = new HttpGenerator(requestBuffers,endp);
73 _parser = new HttpParser(responseBuffers,endp,new Handler());
74 }
75
76 public void setReserved (boolean reserved)
77 {
78 _reserved = reserved;
79 }
80
81 public boolean isReserved()
82 {
83 return _reserved;
84 }
85
86 public HttpDestination getDestination()
87 {
88 return _destination;
89 }
90
91 public void setDestination(HttpDestination destination)
92 {
93 _destination = destination;
94 }
95
96 public boolean send(HttpExchange ex) throws IOException
97 {
98 synchronized (this)
99 {
100 if (_exchange != null)
101 {
102 if (_pipeline != null)
103 throw new IllegalStateException(this + " PIPELINED!!! _exchange=" + _exchange);
104 _pipeline = ex;
105 return true;
106 }
107
108 if (!_endp.isOpen())
109 return false;
110
111 _exchange = ex;
112 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_COMMIT);
113
114 if (_endp.isBlocking())
115 {
116 this.notify();
117 }
118 else
119 {
120 SelectChannelEndPoint scep = (SelectChannelEndPoint)_endp;
121 scep.scheduleWrite();
122 }
123 _destination.getHttpClient().schedule(_timeout);
124
125 return true;
126 }
127 }
128
129 public void handle() throws IOException
130 {
131 int no_progress = 0;
132 long flushed = 0;
133
134 boolean failed = false;
135 while (_endp.isBufferingInput() || _endp.isOpen())
136 {
137 synchronized (this)
138 {
139 while (_exchange == null)
140 {
141 if (_endp.isBlocking())
142 {
143 try
144 {
145 this.wait();
146 }
147 catch (InterruptedException e)
148 {
149 throw new InterruptedIOException();
150 }
151 }
152 else
153 {
154
155 _parser.fill();
156 _parser.skipCRLF();
157 if (_parser.isMoreInBuffer())
158 {
159 Log.warn("Unexpected data received but no request sent");
160 close();
161 }
162 return;
163 }
164 }
165 }
166 if (_exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
167 {
168 no_progress = 0;
169 commitRequest();
170 }
171
172 try
173 {
174 long io = 0;
175 _endp.flush();
176
177 if (_generator.isComplete())
178 {
179 if (!_requestComplete)
180 {
181 _requestComplete = true;
182 _exchange.getEventListener().onRequestComplete();
183 }
184 }
185 else
186 {
187
188 synchronized (this)
189 {
190 if (_exchange == null)
191 continue;
192 flushed = _generator.flushBuffer();
193 io += flushed;
194 }
195
196 if (!_generator.isComplete())
197 {
198 InputStream in = _exchange.getRequestContentSource();
199 if (in != null)
200 {
201 if (_requestContentChunk == null || _requestContentChunk.length() == 0)
202 {
203 _requestContentChunk = _exchange.getRequestContentChunk();
204 if (_requestContentChunk != null)
205 _generator.addContent(_requestContentChunk,false);
206 else
207 _generator.complete();
208 io += _generator.flushBuffer();
209 }
210 }
211 else
212 _generator.complete();
213 }
214 }
215
216 if (_generator.isComplete() && !_requestComplete)
217 {
218 _requestComplete = true;
219 _exchange.getEventListener().onRequestComplete();
220 }
221
222
223 if (!_parser.isComplete() && _generator.isCommitted())
224 {
225 long filled = _parser.parseAvailable();
226 io += filled;
227 }
228
229 if (io > 0)
230 no_progress = 0;
231 else if (no_progress++ >= 2 && !_endp.isBlocking())
232 {
233
234 if (_endp instanceof SslSelectChannelEndPoint && !_generator.isComplete() && !_generator.isEmpty())
235 {
236 if (_generator.flushBuffer()>0)
237 continue;
238 }
239 return;
240 }
241 }
242 catch (Throwable e)
243 {
244 Log.debug("Failure on " + _exchange, e);
245
246 if (e instanceof ThreadDeath)
247 throw (ThreadDeath)e;
248
249 synchronized (this)
250 {
251 if (_exchange != null)
252 {
253 _exchange.getEventListener().onException(e);
254 _exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
255 }
256 }
257
258 failed = true;
259 if (e instanceof IOException)
260 throw (IOException)e;
261
262 if (e instanceof Error)
263 throw (Error)e;
264
265 if (e instanceof RuntimeException)
266 throw (RuntimeException)e;
267
268 throw new RuntimeException(e);
269 }
270 finally
271 {
272 boolean complete = false;
273 boolean close = failed;
274 if (!failed)
275 {
276
277 if (_generator.isComplete())
278 {
279 if (!_requestComplete)
280 {
281 _requestComplete = true;
282 _exchange.getEventListener().onRequestComplete();
283 }
284
285
286
287 if (_parser.isComplete())
288 {
289 _destination.getHttpClient().cancel(_timeout);
290 complete = true;
291 }
292 }
293 }
294
295 if (complete || failed)
296 {
297 synchronized (this)
298 {
299 if (!close)
300 close = shouldClose();
301
302 reset(true);
303
304 no_progress = 0;
305 if (_exchange != null)
306 {
307 _exchange = null;
308
309 if (_pipeline == null)
310 {
311 if (!isReserved())
312 _destination.returnConnection(this,close);
313 }
314 else
315 {
316 if (close)
317 {
318 if (!isReserved())
319 _destination.returnConnection(this,close);
320
321 HttpExchange exchange = _pipeline;
322 _pipeline = null;
323 _destination.send(exchange);
324 }
325 else
326 {
327 HttpExchange exchange = _pipeline;
328 _pipeline = null;
329 send(exchange);
330 }
331 }
332 }
333 }
334 }
335 }
336 }
337 }
338
339 public boolean isIdle()
340 {
341 synchronized (this)
342 {
343 return _exchange == null;
344 }
345 }
346
347
348
349
350 public boolean isSuspended()
351 {
352 return false;
353 }
354
355 public EndPoint getEndPoint()
356 {
357 return _endp;
358 }
359
360 private void commitRequest() throws IOException
361 {
362 synchronized (this)
363 {
364 if (_exchange.getStatus() != HttpExchange.STATUS_WAITING_FOR_COMMIT)
365 throw new IllegalStateException();
366
367 _exchange.setStatus(HttpExchange.STATUS_SENDING_REQUEST);
368 _generator.setVersion(_exchange.getVersion());
369
370 String uri = _exchange.getURI();
371 if (_destination.isProxied() && uri.startsWith("/"))
372 {
373
374 uri = (_destination.isSecure()?HttpSchemes.HTTPS:HttpSchemes.HTTP) + "://" + _destination.getAddress().getHost() + ":"
375 + _destination.getAddress().getPort() + uri;
376 Authorization auth = _destination.getProxyAuthentication();
377 if (auth != null)
378 auth.setCredentials(_exchange);
379 }
380
381 _generator.setRequest(_exchange.getMethod(), uri);
382
383 HttpFields requestHeaders = _exchange.getRequestFields();
384 if (_exchange.getVersion() >= HttpVersions.HTTP_1_1_ORDINAL)
385 {
386 if (!requestHeaders.containsKey(HttpHeaders.HOST_BUFFER))
387 requestHeaders.add(HttpHeaders.HOST_BUFFER,_destination.getHostHeader());
388 }
389
390 Buffer requestContent = _exchange.getRequestContent();
391 if (requestContent != null)
392 {
393 requestHeaders.putLongField(HttpHeaders.CONTENT_LENGTH, requestContent.length());
394 _generator.completeHeader(requestHeaders,false);
395 _generator.addContent(requestContent,true);
396 }
397 else
398 {
399 InputStream requestContentStream = _exchange.getRequestContentSource();
400 if (requestContentStream != null)
401 {
402 _generator.completeHeader(requestHeaders, false);
403 int available = requestContentStream.available();
404 if (available > 0)
405 {
406
407
408 byte[] buf = new byte[available];
409 int length = requestContentStream.read(buf);
410 _generator.addContent(new ByteArrayBuffer(buf,0,length),false);
411 }
412 }
413 else
414 {
415 requestHeaders.remove(HttpHeaders.CONTENT_LENGTH);
416 _generator.completeHeader(requestHeaders, true);
417 }
418 }
419
420 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
421 }
422 }
423
424 protected void reset(boolean returnBuffers) throws IOException
425 {
426 _requestComplete = false;
427 _connectionHeader = null;
428 _parser.reset(returnBuffers);
429 _generator.reset(returnBuffers);
430 _http11 = true;
431 }
432
433 private boolean shouldClose()
434 {
435 if (_connectionHeader!=null)
436 {
437 if (HttpHeaderValues.CLOSE_BUFFER.equals(_connectionHeader))
438 return true;
439 if (HttpHeaderValues.KEEP_ALIVE_BUFFER.equals(_connectionHeader))
440 return false;
441 }
442 return !_http11;
443 }
444
445 private class Handler extends HttpParser.EventHandler
446 {
447 @Override
448 public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
449 {
450
451
452
453
454
455 }
456
457 @Override
458 public void startResponse(Buffer version, int status, Buffer reason) throws IOException
459 {
460 HttpExchange exchange = _exchange;
461 if (exchange!=null)
462 {
463 _http11 = HttpVersions.HTTP_1_1_BUFFER.equals(version);
464 exchange.getEventListener().onResponseStatus(version,status,reason);
465 exchange.setStatus(HttpExchange.STATUS_PARSING_HEADERS);
466 }
467 }
468
469 @Override
470 public void parsedHeader(Buffer name, Buffer value) throws IOException
471 {
472 HttpExchange exchange = _exchange;
473 if (exchange!=null)
474 {
475 if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.CONNECTION_ORDINAL)
476 {
477 _connectionHeader = HttpHeaderValues.CACHE.lookup(value);
478 }
479 exchange.getEventListener().onResponseHeader(name,value);
480 }
481 }
482
483 @Override
484 public void headerComplete() throws IOException
485 {
486 HttpExchange exchange = _exchange;
487 if (exchange!=null)
488 exchange.setStatus(HttpExchange.STATUS_PARSING_CONTENT);
489 }
490
491 @Override
492 public void content(Buffer ref) throws IOException
493 {
494 HttpExchange exchange = _exchange;
495 if (exchange!=null)
496 exchange.getEventListener().onResponseContent(ref);
497 }
498
499 @Override
500 public void messageComplete(long contextLength) throws IOException
501 {
502 HttpExchange exchange = _exchange;
503 if (exchange!=null)
504 exchange.setStatus(HttpExchange.STATUS_COMPLETED);
505 }
506 }
507
508 public String toString()
509 {
510 return "HttpConnection@" + hashCode() + "//" + _destination.getAddress().getHost() + ":" + _destination.getAddress().getPort();
511 }
512
513 public String toDetailString()
514 {
515 return toString() + " ex=" + _exchange + " " + _timeout.getAge();
516 }
517
518
519
520
521 public long getLast()
522 {
523 return _last;
524 }
525
526
527
528
529
530 public void setLast(long last)
531 {
532 _last = last;
533 }
534
535 public void close() throws IOException
536 {
537 _endp.close();
538 }
539
540 private class TimeoutTask extends Timeout.Task
541 {
542 public void expired()
543 {
544 HttpExchange ex = null;
545 try
546 {
547 synchronized (HttpConnection.this)
548 {
549 ex = _exchange;
550 _exchange = null;
551 if (ex != null)
552 {
553 _destination.returnConnection(HttpConnection.this, true);
554 }
555 }
556 }
557 catch (Exception e)
558 {
559 Log.debug(e);
560 }
561 finally
562 {
563 try
564 {
565 close();
566 }
567 catch (IOException e)
568 {
569 Log.ignore(e);
570 }
571
572 if (ex != null && ex.getStatus() < HttpExchange.STATUS_COMPLETED)
573 {
574 ex.setStatus(HttpExchange.STATUS_EXPIRED);
575 }
576 }
577 }
578 }
579 }