1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.client;
15
16 import java.io.EOFException;
17 import java.io.IOException;
18 import java.io.InputStream;
19 import java.io.InterruptedIOException;
20 import java.util.Collections;
21 import java.util.concurrent.atomic.AtomicBoolean;
22
23 import org.eclipse.jetty.client.security.Authentication;
24 import org.eclipse.jetty.http.HttpFields;
25 import org.eclipse.jetty.http.HttpGenerator;
26 import org.eclipse.jetty.http.HttpHeaderValues;
27 import org.eclipse.jetty.http.HttpHeaders;
28 import org.eclipse.jetty.http.HttpMethods;
29 import org.eclipse.jetty.http.HttpParser;
30 import org.eclipse.jetty.http.HttpSchemes;
31 import org.eclipse.jetty.http.HttpStatus;
32 import org.eclipse.jetty.http.HttpVersions;
33 import org.eclipse.jetty.io.AbstractConnection;
34 import org.eclipse.jetty.io.AsyncEndPoint;
35 import org.eclipse.jetty.io.Buffer;
36 import org.eclipse.jetty.io.Buffers;
37 import org.eclipse.jetty.io.ByteArrayBuffer;
38 import org.eclipse.jetty.io.Connection;
39 import org.eclipse.jetty.io.EndPoint;
40 import org.eclipse.jetty.io.View;
41 import org.eclipse.jetty.io.nio.SslSelectChannelEndPoint;
42 import org.eclipse.jetty.util.component.AggregateLifeCycle;
43 import org.eclipse.jetty.util.component.Dumpable;
44 import org.eclipse.jetty.util.log.Log;
45 import org.eclipse.jetty.util.thread.Timeout;
46
47
48
49
50
51 public class HttpConnection extends AbstractConnection implements Dumpable
52 {
53 private HttpDestination _destination;
54 private HttpGenerator _generator;
55 private HttpParser _parser;
56 private boolean _http11 = true;
57 private int _status;
58 private Buffer _connectionHeader;
59 private Buffer _requestContentChunk;
60 private boolean _requestComplete;
61 private boolean _reserved;
62
63
64 private volatile HttpExchange _exchange;
65 private HttpExchange _pipeline;
66 private final Timeout.Task _idleTimeout = new ConnectionIdleTask();
67 private AtomicBoolean _idle = new AtomicBoolean(false);
68
69
70 HttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp)
71 {
72 super(endp);
73
74 _generator = new HttpGenerator(requestBuffers,endp);
75 _parser = new HttpParser(responseBuffers,endp,new Handler());
76 }
77
78 public void setReserved (boolean reserved)
79 {
80 _reserved = reserved;
81 }
82
83 public boolean isReserved()
84 {
85 return _reserved;
86 }
87
88 public HttpDestination getDestination()
89 {
90 return _destination;
91 }
92
93 public void setDestination(HttpDestination destination)
94 {
95 _destination = destination;
96 }
97
98 public boolean send(HttpExchange ex) throws IOException
99 {
100 synchronized (this)
101 {
102 if (_exchange != null)
103 {
104 if (_pipeline != null)
105 throw new IllegalStateException(this + " PIPELINED!!! _exchange=" + _exchange);
106 _pipeline = ex;
107 return true;
108 }
109
110 _exchange = ex;
111 _exchange.associate(this);
112
113
114 if (!_endp.isOpen())
115 {
116 _exchange.disassociate();
117 _exchange = null;
118 return false;
119 }
120
121 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_COMMIT);
122
123 if (_endp.isBlocking())
124 {
125 this.notify();
126 }
127 else
128 {
129 AsyncEndPoint scep = (AsyncEndPoint)_endp;
130 scep.scheduleWrite();
131 }
132
133 adjustIdleTimeout();
134
135 return true;
136 }
137 }
138
139 private void adjustIdleTimeout() throws IOException
140 {
141
142
143
144
145
146 long timeout = _exchange.getTimeout();
147 if (timeout <= 0)
148 timeout = _destination.getHttpClient().getTimeout();
149
150 long endPointTimeout = _endp.getMaxIdleTime();
151
152 if (timeout > 0 && timeout > endPointTimeout)
153 {
154
155
156
157 _endp.setMaxIdleTime(2 * (int)timeout);
158 }
159 }
160
161 public Connection handle() throws IOException
162 {
163 try
164 {
165 int no_progress = 0;
166
167 boolean failed = false;
168 while (_endp.isBufferingInput() || _endp.isOpen())
169 {
170 synchronized (this)
171 {
172 while (_exchange == null)
173 {
174 if (_endp.isBlocking())
175 {
176 try
177 {
178 this.wait();
179 }
180 catch (InterruptedException e)
181 {
182 throw new InterruptedIOException();
183 }
184 }
185 else
186 {
187 long filled = _parser.fill();
188 if (filled < 0)
189 {
190 close();
191 }
192 else
193 {
194
195 _parser.skipCRLF();
196 if (_parser.isMoreInBuffer())
197 {
198 Log.warn("Unexpected data received but no request sent");
199 close();
200 }
201 }
202 return this;
203 }
204 }
205 }
206
207 try
208 {
209 if (_exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
210 {
211 no_progress = 0;
212 commitRequest();
213 }
214
215 long io = 0;
216 _endp.flush();
217
218 if (_generator.isComplete())
219 {
220 if (!_requestComplete)
221 {
222 _requestComplete = true;
223 _exchange.getEventListener().onRequestComplete();
224 }
225 }
226 else
227 {
228
229 synchronized (this)
230 {
231 if (_exchange == null)
232 continue;
233 }
234
235 long flushed = _generator.flushBuffer();
236 io += flushed;
237
238 if (!_generator.isComplete())
239 {
240 if (_exchange!=null)
241 {
242 InputStream in = _exchange.getRequestContentSource();
243 if (in != null)
244 {
245 if (_requestContentChunk == null || _requestContentChunk.length() == 0)
246 {
247 _requestContentChunk = _exchange.getRequestContentChunk();
248
249 if (_requestContentChunk != null)
250 _generator.addContent(_requestContentChunk,false);
251 else
252 _generator.complete();
253
254 flushed = _generator.flushBuffer();
255 io += flushed;
256 }
257 }
258 else
259 _generator.complete();
260 }
261 else
262 _generator.complete();
263 }
264 }
265
266 if (_generator.isComplete() && !_requestComplete)
267 {
268 _requestComplete = true;
269 _exchange.getEventListener().onRequestComplete();
270 }
271
272
273 if (!_parser.isComplete() && (_generator.isComplete() || _generator.isCommitted() && !_endp.isBlocking()))
274 {
275 long filled = _parser.parseAvailable();
276 io += filled;
277 }
278
279 if (io > 0)
280 no_progress = 0;
281 else if (no_progress++ >= 1 && !_endp.isBlocking())
282 {
283
284 if (_endp instanceof SslSelectChannelEndPoint && !_generator.isComplete() && !_generator.isEmpty())
285 {
286 long flushed = _generator.flushBuffer();
287 if (flushed>0)
288 continue;
289 }
290 return this;
291 }
292 }
293 catch (Throwable e)
294 {
295 Log.debug("Failure on " + _exchange, e);
296
297 if (e instanceof ThreadDeath)
298 throw (ThreadDeath)e;
299
300 failed = true;
301
302 synchronized (this)
303 {
304 if (_exchange != null)
305 {
306
307
308 if (_exchange.getStatus() != HttpExchange.STATUS_CANCELLING &&
309 _exchange.getStatus() != HttpExchange.STATUS_CANCELLED)
310 {
311 _exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
312 _exchange.getEventListener().onException(e);
313 }
314 }
315 else
316 {
317 if (e instanceof IOException)
318 throw (IOException)e;
319
320 if (e instanceof Error)
321 throw (Error)e;
322
323 if (e instanceof RuntimeException)
324 throw (RuntimeException)e;
325
326 throw new RuntimeException(e);
327 }
328 }
329 }
330 finally
331 {
332 boolean complete = false;
333 boolean close = failed;
334 if (!failed)
335 {
336
337 if (_generator.isComplete())
338 {
339 if (!_requestComplete)
340 {
341 _requestComplete = true;
342 _exchange.getEventListener().onRequestComplete();
343 }
344
345
346
347 if (_parser.isComplete())
348 {
349 _exchange.cancelTimeout(_destination.getHttpClient());
350 complete = true;
351 }
352 }
353 }
354
355 if (_generator.isComplete() && !_parser.isComplete())
356 {
357 if (!_endp.isOpen() || _endp.isInputShutdown())
358 {
359 complete=true;
360 close=true;
361 close();
362 }
363 }
364
365 if (complete || failed)
366 {
367 synchronized (this)
368 {
369 if (!close)
370 close = shouldClose();
371
372 reset(true);
373
374 no_progress = 0;
375 if (_exchange != null)
376 {
377 HttpExchange exchange=_exchange;
378 _exchange = null;
379
380
381 if (!close)
382 _endp.setMaxIdleTime((int)_destination.getHttpClient().getIdleTimeout());
383
384 if (_status==HttpStatus.SWITCHING_PROTOCOLS_101)
385 {
386 Connection switched=exchange.onSwitchProtocol(_endp);
387 if (switched!=null)
388 {
389
390 exchange = _pipeline;
391 _pipeline = null;
392 if (exchange!=null)
393 _destination.send(exchange);
394
395 return switched;
396 }
397 }
398
399 if (_pipeline == null)
400 {
401 if (!isReserved())
402 _destination.returnConnection(this, close);
403 }
404 else
405 {
406 if (close)
407 {
408 if (!isReserved())
409 _destination.returnConnection(this,close);
410
411 exchange = _pipeline;
412 _pipeline = null;
413 _destination.send(exchange);
414 }
415 else
416 {
417 exchange = _pipeline;
418 _pipeline = null;
419 send(exchange);
420 }
421 }
422 }
423 }
424 }
425 }
426 }
427 }
428 finally
429 {
430 _parser.returnBuffers();
431
432
433 if (!_generator.isComplete() && _generator.getBytesBuffered()>0 && _endp instanceof AsyncEndPoint)
434 {
435
436 ((AsyncEndPoint)_endp).scheduleWrite();
437 }
438 }
439
440 return this;
441 }
442
443 public boolean isIdle()
444 {
445 synchronized (this)
446 {
447 return _exchange == null;
448 }
449 }
450
451 public boolean isSuspended()
452 {
453 return false;
454 }
455
456 public void closed()
457 {
458 }
459
460 private void commitRequest() throws IOException
461 {
462 synchronized (this)
463 {
464 _status=0;
465 if (_exchange.getStatus() != HttpExchange.STATUS_WAITING_FOR_COMMIT)
466 throw new IllegalStateException();
467
468 _exchange.setStatus(HttpExchange.STATUS_SENDING_REQUEST);
469 _generator.setVersion(_exchange.getVersion());
470
471 String method=_exchange.getMethod();
472 String uri = _exchange.getURI();
473 if (_destination.isProxied() && !HttpMethods.CONNECT.equals(method) && uri.startsWith("/"))
474 {
475 boolean secure = _destination.isSecure();
476 String host = _destination.getAddress().getHost();
477 int port = _destination.getAddress().getPort();
478 StringBuilder absoluteURI = new StringBuilder();
479 absoluteURI.append(secure ? HttpSchemes.HTTPS : HttpSchemes.HTTP);
480 absoluteURI.append("://");
481 absoluteURI.append(host);
482
483 if (!(secure && port == 443 || !secure && port == 80))
484 absoluteURI.append(":").append(port);
485 absoluteURI.append(uri);
486 uri = absoluteURI.toString();
487 Authentication auth = _destination.getProxyAuthentication();
488 if (auth != null)
489 auth.setCredentials(_exchange);
490 }
491
492 _generator.setRequest(method, uri);
493 _parser.setHeadResponse(HttpMethods.HEAD.equalsIgnoreCase(method));
494
495 HttpFields requestHeaders = _exchange.getRequestFields();
496 if (_exchange.getVersion() >= HttpVersions.HTTP_1_1_ORDINAL)
497 {
498 if (!requestHeaders.containsKey(HttpHeaders.HOST_BUFFER))
499 requestHeaders.add(HttpHeaders.HOST_BUFFER,_destination.getHostHeader());
500 }
501
502 Buffer requestContent = _exchange.getRequestContent();
503 if (requestContent != null)
504 {
505 requestHeaders.putLongField(HttpHeaders.CONTENT_LENGTH, requestContent.length());
506 _generator.completeHeader(requestHeaders,false);
507 _generator.addContent(new View(requestContent),true);
508 }
509 else
510 {
511 InputStream requestContentStream = _exchange.getRequestContentSource();
512 if (requestContentStream != null)
513 {
514 _generator.completeHeader(requestHeaders, false);
515 int available = requestContentStream.available();
516 if (available > 0)
517 {
518
519
520 byte[] buf = new byte[available];
521 int length = requestContentStream.read(buf);
522 _generator.addContent(new ByteArrayBuffer(buf, 0, length), false);
523 }
524 }
525 else
526 {
527 requestHeaders.remove(HttpHeaders.CONTENT_LENGTH);
528 _generator.completeHeader(requestHeaders, true);
529 }
530 }
531
532 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
533 }
534 }
535
536 protected void reset(boolean returnBuffers) throws IOException
537 {
538 _requestComplete = false;
539 _connectionHeader = null;
540 _parser.reset();
541 if (returnBuffers)
542 _parser.returnBuffers();
543 _generator.reset(returnBuffers);
544 _http11 = true;
545 }
546
547 private boolean shouldClose()
548 {
549 if (_connectionHeader!=null)
550 {
551 if (HttpHeaderValues.CLOSE_BUFFER.equals(_connectionHeader))
552 return true;
553 if (HttpHeaderValues.KEEP_ALIVE_BUFFER.equals(_connectionHeader))
554 return false;
555 }
556 return !_http11;
557 }
558
559 private class Handler extends HttpParser.EventHandler
560 {
561 @Override
562 public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
563 {
564
565
566
567
568
569 }
570
571 @Override
572 public void startResponse(Buffer version, int status, Buffer reason) throws IOException
573 {
574
575 HttpExchange exchange = _exchange;
576 if (exchange!=null)
577 {
578
579 if (status==HttpStatus.OK_200 && HttpMethods.CONNECT.equalsIgnoreCase(exchange.getMethod()))
580 _parser.setHeadResponse(true);
581
582 _http11 = HttpVersions.HTTP_1_1_BUFFER.equals(version);
583 _status=status;
584 exchange.getEventListener().onResponseStatus(version,status,reason);
585 exchange.setStatus(HttpExchange.STATUS_PARSING_HEADERS);
586 }
587 }
588
589 @Override
590 public void parsedHeader(Buffer name, Buffer value) throws IOException
591 {
592 HttpExchange exchange = _exchange;
593 if (exchange!=null)
594 {
595 if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.CONNECTION_ORDINAL)
596 {
597 _connectionHeader = HttpHeaderValues.CACHE.lookup(value);
598 }
599 exchange.getEventListener().onResponseHeader(name,value);
600 }
601 }
602
603 @Override
604 public void headerComplete() throws IOException
605 {
606 if (_endp instanceof AsyncEndPoint)
607 ((AsyncEndPoint)_endp).scheduleIdle();
608 HttpExchange exchange = _exchange;
609 if (exchange!=null)
610 exchange.setStatus(HttpExchange.STATUS_PARSING_CONTENT);
611 }
612
613 @Override
614 public void content(Buffer ref) throws IOException
615 {
616 if (_endp instanceof AsyncEndPoint)
617 ((AsyncEndPoint)_endp).scheduleIdle();
618 HttpExchange exchange = _exchange;
619 if (exchange!=null)
620 exchange.getEventListener().onResponseContent(ref);
621 }
622
623 @Override
624 public void messageComplete(long contextLength) throws IOException
625 {
626 HttpExchange exchange = _exchange;
627 if (exchange!=null)
628 exchange.setStatus(HttpExchange.STATUS_COMPLETED);
629 }
630 }
631
632 @Override
633 public String toString()
634 {
635 return "HttpConnection@" + hashCode() + "//" + _destination.getAddress().getHost() + ":" + _destination.getAddress().getPort();
636 }
637
638 public String toDetailString()
639 {
640 return toString() + " ex=" + _exchange + " idle for " + _idleTimeout.getAge();
641 }
642
643 public void close() throws IOException
644 {
645
646
647
648 HttpExchange exchange = _exchange;
649 if (exchange != null && !exchange.isDone())
650 {
651 switch (exchange.getStatus())
652 {
653 case HttpExchange.STATUS_CANCELLED:
654 case HttpExchange.STATUS_CANCELLING:
655 case HttpExchange.STATUS_COMPLETED:
656 case HttpExchange.STATUS_EXCEPTED:
657 case HttpExchange.STATUS_EXPIRED:
658 break;
659 default:
660 String exch= exchange.toString();
661 String reason = _endp.isOpen()?(_endp.isInputShutdown()?"half closed: ":"local close: "):"closed: ";
662 exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
663 exchange.getEventListener().onException(new EOFException(reason+exch));
664 }
665 }
666
667 _endp.close();
668 }
669
670 public void setIdleTimeout()
671 {
672 synchronized (this)
673 {
674 if (_idle.compareAndSet(false, true))
675 _destination.getHttpClient().scheduleIdle(_idleTimeout);
676 else
677 throw new IllegalStateException();
678 }
679 }
680
681 public boolean cancelIdleTimeout()
682 {
683 synchronized (this)
684 {
685 if (_idle.compareAndSet(true, false))
686 {
687 _destination.getHttpClient().cancel(_idleTimeout);
688 return true;
689 }
690 }
691
692 return false;
693 }
694
695 protected void exchangeExpired(HttpExchange exchange)
696 {
697 synchronized (this)
698 {
699
700
701 if (_exchange == exchange)
702 {
703 try
704 {
705 _destination.returnConnection(this, true);
706 }
707 catch (IOException x)
708 {
709 Log.ignore(x);
710 }
711 }
712 }
713 }
714
715
716
717
718
719 public String dump()
720 {
721 return AggregateLifeCycle.dump(this);
722 }
723
724
725
726
727
728 public void dump(Appendable out, String indent) throws IOException
729 {
730 synchronized (this)
731 {
732 out.append(String.valueOf(this)).append("\n");
733 AggregateLifeCycle.dump(out,indent,Collections.singletonList(_endp));
734 }
735 }
736
737 private class ConnectionIdleTask extends Timeout.Task
738 {
739 @Override
740 public void expired()
741 {
742
743 if (_idle.compareAndSet(true, false))
744 {
745 _destination.returnIdleConnection(HttpConnection.this);
746 }
747 }
748 }
749 }