1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.client;
15
16 import java.io.EOFException;
17 import java.io.IOException;
18 import java.io.InputStream;
19 import java.io.InterruptedIOException;
20 import java.util.Collections;
21 import java.util.concurrent.atomic.AtomicBoolean;
22
23 import org.eclipse.jetty.client.security.Authentication;
24 import org.eclipse.jetty.http.HttpFields;
25 import org.eclipse.jetty.http.HttpGenerator;
26 import org.eclipse.jetty.http.HttpHeaderValues;
27 import org.eclipse.jetty.http.HttpHeaders;
28 import org.eclipse.jetty.http.HttpMethods;
29 import org.eclipse.jetty.http.HttpParser;
30 import org.eclipse.jetty.http.HttpSchemes;
31 import org.eclipse.jetty.http.HttpStatus;
32 import org.eclipse.jetty.http.HttpVersions;
33 import org.eclipse.jetty.io.AbstractConnection;
34 import org.eclipse.jetty.io.AsyncEndPoint;
35 import org.eclipse.jetty.io.Buffer;
36 import org.eclipse.jetty.io.Buffers;
37 import org.eclipse.jetty.io.ByteArrayBuffer;
38 import org.eclipse.jetty.io.Connection;
39 import org.eclipse.jetty.io.EndPoint;
40 import org.eclipse.jetty.io.EofException;
41 import org.eclipse.jetty.io.View;
42 import org.eclipse.jetty.io.nio.SslSelectChannelEndPoint;
43 import org.eclipse.jetty.util.component.AggregateLifeCycle;
44 import org.eclipse.jetty.util.component.Dumpable;
45 import org.eclipse.jetty.util.log.Log;
46 import org.eclipse.jetty.util.log.Logger;
47 import org.eclipse.jetty.util.thread.Timeout;
48
49
50
51
52
53 public class HttpConnection extends AbstractConnection implements Dumpable
54 {
55 private static final Logger LOG = Log.getLogger(HttpConnection.class);
56
57 private HttpDestination _destination;
58 private HttpGenerator _generator;
59 private HttpParser _parser;
60 private boolean _http11 = true;
61 private int _status;
62 private Buffer _connectionHeader;
63 private Buffer _requestContentChunk;
64 private boolean _requestComplete;
65 private boolean _reserved;
66
67
68 private volatile HttpExchange _exchange;
69 private HttpExchange _pipeline;
70 private final Timeout.Task _idleTimeout = new ConnectionIdleTask();
71 private AtomicBoolean _idle = new AtomicBoolean(false);
72
73
74 HttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp)
75 {
76 super(endp);
77
78 _generator = new HttpGenerator(requestBuffers,endp);
79 _parser = new HttpParser(responseBuffers,endp,new Handler());
80 }
81
82 public void setReserved (boolean reserved)
83 {
84 _reserved = reserved;
85 }
86
87 public boolean isReserved()
88 {
89 return _reserved;
90 }
91
92 public HttpDestination getDestination()
93 {
94 return _destination;
95 }
96
97 public void setDestination(HttpDestination destination)
98 {
99 _destination = destination;
100 }
101
102 public boolean send(HttpExchange ex) throws IOException
103 {
104 synchronized (this)
105 {
106 if (_exchange != null)
107 {
108 if (_pipeline != null)
109 throw new IllegalStateException(this + " PIPELINED!!! _exchange=" + _exchange);
110 _pipeline = ex;
111 return true;
112 }
113
114 _exchange = ex;
115 _exchange.associate(this);
116
117
118 if (!_endp.isOpen())
119 {
120 _exchange.disassociate();
121 _exchange = null;
122 return false;
123 }
124
125 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_COMMIT);
126
127 if (_endp.isBlocking())
128 {
129 this.notify();
130 }
131 else
132 {
133 AsyncEndPoint scep = (AsyncEndPoint)_endp;
134 scep.scheduleWrite();
135 }
136
137 adjustIdleTimeout();
138
139 return true;
140 }
141 }
142
143 private void adjustIdleTimeout() throws IOException
144 {
145
146
147
148
149
150 long timeout = _exchange.getTimeout();
151 if (timeout <= 0)
152 timeout = _destination.getHttpClient().getTimeout();
153
154 long endPointTimeout = _endp.getMaxIdleTime();
155
156 if (timeout > 0 && timeout > endPointTimeout)
157 {
158
159
160
161 _endp.setMaxIdleTime(2 * (int)timeout);
162 }
163 }
164
165 public Connection handle() throws IOException
166 {
167 try
168 {
169 int no_progress = 0;
170
171 boolean failed = false;
172 while (_endp.isBufferingInput() || _endp.isOpen())
173 {
174 synchronized (this)
175 {
176 while (_exchange == null)
177 {
178 if (_endp.isBlocking())
179 {
180 try
181 {
182 this.wait();
183 }
184 catch (InterruptedException e)
185 {
186 throw new InterruptedIOException();
187 }
188 }
189 else
190 {
191 long filled = _parser.fill();
192 if (filled < 0)
193 {
194 close();
195 }
196 else
197 {
198
199 _parser.skipCRLF();
200 if (_parser.isMoreInBuffer())
201 {
202 LOG.warn("Unexpected data received but no request sent");
203 close();
204 }
205 }
206 return this;
207 }
208 }
209 }
210
211 try
212 {
213 if (_exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
214 {
215 no_progress = 0;
216 commitRequest();
217 }
218
219 long io = 0;
220 _endp.flush();
221
222 if (_generator.isComplete())
223 {
224 if (!_requestComplete)
225 {
226 _requestComplete = true;
227 _exchange.getEventListener().onRequestComplete();
228 }
229 }
230 else
231 {
232
233 synchronized (this)
234 {
235 if (_exchange == null)
236 continue;
237 }
238
239 long flushed = _generator.flushBuffer();
240 io += flushed;
241
242 if (!_generator.isComplete())
243 {
244 if (_exchange!=null)
245 {
246 InputStream in = _exchange.getRequestContentSource();
247 if (in != null)
248 {
249 if (_requestContentChunk == null || _requestContentChunk.length() == 0)
250 {
251 _requestContentChunk = _exchange.getRequestContentChunk();
252
253 if (_requestContentChunk != null)
254 _generator.addContent(_requestContentChunk,false);
255 else
256 _generator.complete();
257
258 flushed = _generator.flushBuffer();
259 io += flushed;
260 }
261 }
262 else
263 _generator.complete();
264 }
265 else
266 _generator.complete();
267 }
268 }
269
270 if (_generator.isComplete() && !_requestComplete)
271 {
272 _requestComplete = true;
273 _exchange.getEventListener().onRequestComplete();
274 }
275
276
277 if (!_parser.isComplete() && (_generator.isComplete() || _generator.isCommitted() && !_endp.isBlocking()))
278 {
279 long filled = _parser.parseAvailable();
280 io += filled;
281
282 if (_parser.isIdle() && (_endp.isInputShutdown() || !_endp.isOpen()))
283 throw new EofException();
284 }
285
286 if (io > 0)
287 no_progress = 0;
288 else if (no_progress++ >= 1 && !_endp.isBlocking())
289 {
290
291 if (_endp instanceof SslSelectChannelEndPoint && !_generator.isComplete() && !_generator.isEmpty())
292 {
293 long flushed = _generator.flushBuffer();
294 if (flushed>0)
295 continue;
296 }
297 return this;
298 }
299 }
300 catch (Throwable e)
301 {
302 LOG.debug("Failure on " + _exchange, e);
303
304 if (e instanceof ThreadDeath)
305 throw (ThreadDeath)e;
306
307 failed = true;
308
309 synchronized (this)
310 {
311 if (_exchange != null)
312 {
313
314
315 if (_exchange.getStatus() != HttpExchange.STATUS_CANCELLING &&
316 _exchange.getStatus() != HttpExchange.STATUS_CANCELLED)
317 {
318 _exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
319 _exchange.getEventListener().onException(e);
320 }
321 }
322 else
323 {
324 if (e instanceof IOException)
325 throw (IOException)e;
326
327 if (e instanceof Error)
328 throw (Error)e;
329
330 if (e instanceof RuntimeException)
331 throw (RuntimeException)e;
332
333 throw new RuntimeException(e);
334 }
335 }
336 }
337 finally
338 {
339 boolean complete = false;
340 boolean close = failed;
341 if (!failed)
342 {
343
344 if (_generator.isComplete())
345 {
346 if (!_requestComplete)
347 {
348 _requestComplete = true;
349 _exchange.getEventListener().onRequestComplete();
350 }
351
352
353
354 if (_parser.isComplete())
355 {
356 _exchange.cancelTimeout(_destination.getHttpClient());
357 complete = true;
358 }
359 }
360
361
362 if (!_endp.isOpen() && !(_parser.isComplete()||_parser.isIdle()))
363 {
364
365 complete=true;
366 _parser.parseAvailable();
367
368 if (!(_parser.isComplete()||_parser.isIdle()))
369 {
370 LOG.warn("Incomplete {} {}",_parser,_endp);
371 if (_exchange!=null && !_exchange.isDone())
372 {
373 _exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
374 _exchange.getEventListener().onException(new EOFException("Incomplete"));
375 }
376 }
377 }
378 }
379
380 if (_endp.isInputShutdown() && !_parser.isComplete() && !_parser.isIdle())
381 {
382 if (_exchange!=null && !_exchange.isDone())
383 {
384 _exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
385 _exchange.getEventListener().onException(new EOFException("Incomplete"));
386 }
387 _endp.close();
388 }
389
390 if (complete || failed)
391 {
392 synchronized (this)
393 {
394 if (!close)
395 close = shouldClose();
396
397 reset(true);
398
399 no_progress = 0;
400 if (_exchange != null)
401 {
402 HttpExchange exchange=_exchange;
403 _exchange = null;
404
405
406 if (!close)
407 _endp.setMaxIdleTime((int)_destination.getHttpClient().getIdleTimeout());
408
409 if (_status==HttpStatus.SWITCHING_PROTOCOLS_101)
410 {
411 Connection switched=exchange.onSwitchProtocol(_endp);
412 if (switched!=null)
413 {
414
415 exchange = _pipeline;
416 _pipeline = null;
417 if (exchange!=null)
418 _destination.send(exchange);
419
420 return switched;
421 }
422 }
423
424 if (_pipeline == null)
425 {
426 if (!isReserved())
427 _destination.returnConnection(this, close);
428 }
429 else
430 {
431 if (close)
432 {
433 if (!isReserved())
434 _destination.returnConnection(this,close);
435
436 exchange = _pipeline;
437 _pipeline = null;
438 _destination.send(exchange);
439 }
440 else
441 {
442 exchange = _pipeline;
443 _pipeline = null;
444 send(exchange);
445 }
446 }
447 }
448 }
449 }
450 }
451 }
452 }
453 finally
454 {
455 _parser.returnBuffers();
456
457
458 if (!_generator.isComplete() && _generator.getBytesBuffered()>0 && _endp.isOpen() && _endp instanceof AsyncEndPoint)
459 {
460
461 ((AsyncEndPoint)_endp).scheduleWrite();
462 }
463 }
464
465 return this;
466 }
467
468 public boolean isIdle()
469 {
470 synchronized (this)
471 {
472 return _exchange == null;
473 }
474 }
475
476 public boolean isSuspended()
477 {
478 return false;
479 }
480
481 public void closed()
482 {
483 }
484
485 private void commitRequest() throws IOException
486 {
487 synchronized (this)
488 {
489 _status=0;
490 if (_exchange.getStatus() != HttpExchange.STATUS_WAITING_FOR_COMMIT)
491 throw new IllegalStateException();
492
493 _exchange.setStatus(HttpExchange.STATUS_SENDING_REQUEST);
494 _generator.setVersion(_exchange.getVersion());
495
496 String method=_exchange.getMethod();
497 String uri = _exchange.getURI();
498 if (_destination.isProxied() && !HttpMethods.CONNECT.equals(method) && uri.startsWith("/"))
499 {
500 boolean secure = _destination.isSecure();
501 String host = _destination.getAddress().getHost();
502 int port = _destination.getAddress().getPort();
503 StringBuilder absoluteURI = new StringBuilder();
504 absoluteURI.append(secure ? HttpSchemes.HTTPS : HttpSchemes.HTTP);
505 absoluteURI.append("://");
506 absoluteURI.append(host);
507
508 if (!(secure && port == 443 || !secure && port == 80))
509 absoluteURI.append(":").append(port);
510 absoluteURI.append(uri);
511 uri = absoluteURI.toString();
512 Authentication auth = _destination.getProxyAuthentication();
513 if (auth != null)
514 auth.setCredentials(_exchange);
515 }
516
517 _generator.setRequest(method, uri);
518 _parser.setHeadResponse(HttpMethods.HEAD.equalsIgnoreCase(method));
519
520 HttpFields requestHeaders = _exchange.getRequestFields();
521 if (_exchange.getVersion() >= HttpVersions.HTTP_1_1_ORDINAL)
522 {
523 if (!requestHeaders.containsKey(HttpHeaders.HOST_BUFFER))
524 requestHeaders.add(HttpHeaders.HOST_BUFFER,_destination.getHostHeader());
525 }
526
527 Buffer requestContent = _exchange.getRequestContent();
528 if (requestContent != null)
529 {
530 requestHeaders.putLongField(HttpHeaders.CONTENT_LENGTH, requestContent.length());
531 _generator.completeHeader(requestHeaders,false);
532 _generator.addContent(new View(requestContent),true);
533 }
534 else
535 {
536 InputStream requestContentStream = _exchange.getRequestContentSource();
537 if (requestContentStream != null)
538 {
539 _generator.completeHeader(requestHeaders, false);
540 int available = requestContentStream.available();
541 if (available > 0)
542 {
543
544
545 byte[] buf = new byte[available];
546 int length = requestContentStream.read(buf);
547 _generator.addContent(new ByteArrayBuffer(buf, 0, length), false);
548 }
549 }
550 else
551 {
552 requestHeaders.remove(HttpHeaders.CONTENT_LENGTH);
553 _generator.completeHeader(requestHeaders, true);
554 }
555 }
556
557 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
558 }
559 }
560
561 protected void reset(boolean returnBuffers) throws IOException
562 {
563 _requestComplete = false;
564 _connectionHeader = null;
565 _parser.reset();
566 if (returnBuffers)
567 _parser.returnBuffers();
568 _generator.reset(returnBuffers);
569 _http11 = true;
570 }
571
572 private boolean shouldClose()
573 {
574 if (_endp.isInputShutdown())
575 return true;
576 if (_connectionHeader!=null)
577 {
578 if (HttpHeaderValues.CLOSE_BUFFER.equals(_connectionHeader))
579 return true;
580 if (HttpHeaderValues.KEEP_ALIVE_BUFFER.equals(_connectionHeader))
581 return false;
582 }
583 return !_http11;
584 }
585
586 private class Handler extends HttpParser.EventHandler
587 {
588 @Override
589 public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
590 {
591
592
593
594
595
596 }
597
598 @Override
599 public void startResponse(Buffer version, int status, Buffer reason) throws IOException
600 {
601 HttpExchange exchange = _exchange;
602 if (exchange!=null)
603 {
604 switch(status)
605 {
606 case HttpStatus.CONTINUE_100:
607 case HttpStatus.PROCESSING_102:
608
609 exchange.setEventListener(new NonFinalResponseListener(exchange));
610 break;
611
612 case HttpStatus.OK_200:
613
614 if (HttpMethods.CONNECT.equalsIgnoreCase(exchange.getMethod()))
615 _parser.setHeadResponse(true);
616 break;
617 }
618
619 _http11 = HttpVersions.HTTP_1_1_BUFFER.equals(version);
620 _status=status;
621 exchange.getEventListener().onResponseStatus(version,status,reason);
622 exchange.setStatus(HttpExchange.STATUS_PARSING_HEADERS);
623 }
624 }
625
626 @Override
627 public void parsedHeader(Buffer name, Buffer value) throws IOException
628 {
629 HttpExchange exchange = _exchange;
630 if (exchange!=null)
631 {
632 if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.CONNECTION_ORDINAL)
633 {
634 _connectionHeader = HttpHeaderValues.CACHE.lookup(value);
635 }
636 exchange.getEventListener().onResponseHeader(name,value);
637 }
638 }
639
640 @Override
641 public void headerComplete() throws IOException
642 {
643 if (_endp instanceof AsyncEndPoint)
644 ((AsyncEndPoint)_endp).scheduleIdle();
645 HttpExchange exchange = _exchange;
646 if (exchange!=null)
647 exchange.setStatus(HttpExchange.STATUS_PARSING_CONTENT);
648 }
649
650 @Override
651 public void content(Buffer ref) throws IOException
652 {
653 if (_endp instanceof AsyncEndPoint)
654 ((AsyncEndPoint)_endp).scheduleIdle();
655 HttpExchange exchange = _exchange;
656 if (exchange!=null)
657 exchange.getEventListener().onResponseContent(ref);
658 }
659
660 @Override
661 public void messageComplete(long contextLength) throws IOException
662 {
663 HttpExchange exchange = _exchange;
664 if (exchange!=null)
665 exchange.setStatus(HttpExchange.STATUS_COMPLETED);
666 }
667 }
668
669 @Override
670 public String toString()
671 {
672 return "HttpConnection@" + hashCode() + "//" + _destination.getAddress().getHost() + ":" + _destination.getAddress().getPort();
673 }
674
675 public String toDetailString()
676 {
677 return toString() + " ex=" + _exchange + " idle for " + _idleTimeout.getAge();
678 }
679
680 public void close() throws IOException
681 {
682
683
684
685 HttpExchange exchange = _exchange;
686 if (exchange != null && !exchange.isDone())
687 {
688 switch (exchange.getStatus())
689 {
690 case HttpExchange.STATUS_CANCELLED:
691 case HttpExchange.STATUS_CANCELLING:
692 case HttpExchange.STATUS_COMPLETED:
693 case HttpExchange.STATUS_EXCEPTED:
694 case HttpExchange.STATUS_EXPIRED:
695 break;
696 case HttpExchange.STATUS_PARSING_CONTENT:
697 if (_endp.isInputShutdown() && _parser.isState(HttpParser.STATE_EOF_CONTENT))
698 break;
699 default:
700 String exch= exchange.toString();
701 String reason = _endp.isOpen()?(_endp.isInputShutdown()?"half closed: ":"local close: "):"closed: ";
702 exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
703 exchange.getEventListener().onException(new EOFException(reason+exch));
704 }
705 }
706
707 _endp.close();
708 }
709
710 public void setIdleTimeout()
711 {
712 synchronized (this)
713 {
714 if (_idle.compareAndSet(false, true))
715 _destination.getHttpClient().scheduleIdle(_idleTimeout);
716 else
717 throw new IllegalStateException();
718 }
719 }
720
721 public boolean cancelIdleTimeout()
722 {
723 synchronized (this)
724 {
725 if (_idle.compareAndSet(true, false))
726 {
727 _destination.getHttpClient().cancel(_idleTimeout);
728 return true;
729 }
730 }
731
732 return false;
733 }
734
735 protected void exchangeExpired(HttpExchange exchange)
736 {
737 synchronized (this)
738 {
739
740
741 if (_exchange == exchange)
742 {
743 try
744 {
745 _destination.returnConnection(this, true);
746 }
747 catch (IOException x)
748 {
749 LOG.ignore(x);
750 }
751 }
752 }
753 }
754
755
756
757
758
759 public String dump()
760 {
761 return AggregateLifeCycle.dump(this);
762 }
763
764
765
766
767
768 public void dump(Appendable out, String indent) throws IOException
769 {
770 synchronized (this)
771 {
772 out.append(String.valueOf(this)).append("\n");
773 AggregateLifeCycle.dump(out,indent,Collections.singletonList(_endp));
774 }
775 }
776
777
778 private class ConnectionIdleTask extends Timeout.Task
779 {
780
781 @Override
782 public void expired()
783 {
784
785 if (_idle.compareAndSet(true, false))
786 {
787 _destination.returnIdleConnection(HttpConnection.this);
788 }
789 }
790 }
791
792
793
794 private class NonFinalResponseListener implements HttpEventListener
795 {
796 final HttpExchange _exchange;
797 final HttpEventListener _next;
798
799
800 public NonFinalResponseListener(HttpExchange exchange)
801 {
802 _exchange=exchange;
803 _next=exchange.getEventListener();
804 }
805
806
807 public void onRequestCommitted() throws IOException
808 {
809 }
810
811
812 public void onRequestComplete() throws IOException
813 {
814 }
815
816
817 public void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException
818 {
819 }
820
821
822 public void onResponseHeader(Buffer name, Buffer value) throws IOException
823 {
824 _next.onResponseHeader(name,value);
825 }
826
827
828 public void onResponseHeaderComplete() throws IOException
829 {
830 _next.onResponseHeaderComplete();
831 }
832
833
834 public void onResponseContent(Buffer content) throws IOException
835 {
836 }
837
838
839 public void onResponseComplete() throws IOException
840 {
841 _exchange.setEventListener(_next);
842 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
843 _parser.reset();
844 }
845
846
847 public void onConnectionFailed(Throwable ex)
848 {
849 _exchange.setEventListener(_next);
850 _next.onConnectionFailed(ex);
851 }
852
853
854 public void onException(Throwable ex)
855 {
856 _exchange.setEventListener(_next);
857 _next.onException(ex);
858 }
859
860
861 public void onExpire()
862 {
863 _exchange.setEventListener(_next);
864 _next.onExpire();
865 }
866
867
868 public void onRetry()
869 {
870 _exchange.setEventListener(_next);
871 _next.onRetry();
872 }
873 }
874 }