1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.client;
15
16 import java.io.EOFException;
17 import java.io.IOException;
18 import java.io.InputStream;
19 import java.io.InterruptedIOException;
20 import java.util.Collections;
21 import java.util.concurrent.atomic.AtomicBoolean;
22
23 import org.eclipse.jetty.client.security.Authentication;
24 import org.eclipse.jetty.http.HttpFields;
25 import org.eclipse.jetty.http.HttpGenerator;
26 import org.eclipse.jetty.http.HttpHeaderValues;
27 import org.eclipse.jetty.http.HttpHeaders;
28 import org.eclipse.jetty.http.HttpMethods;
29 import org.eclipse.jetty.http.HttpParser;
30 import org.eclipse.jetty.http.HttpSchemes;
31 import org.eclipse.jetty.http.HttpStatus;
32 import org.eclipse.jetty.http.HttpVersions;
33 import org.eclipse.jetty.io.AbstractConnection;
34 import org.eclipse.jetty.io.AsyncEndPoint;
35 import org.eclipse.jetty.io.Buffer;
36 import org.eclipse.jetty.io.Buffers;
37 import org.eclipse.jetty.io.ByteArrayBuffer;
38 import org.eclipse.jetty.io.Connection;
39 import org.eclipse.jetty.io.EndPoint;
40 import org.eclipse.jetty.io.View;
41 import org.eclipse.jetty.io.nio.SslSelectChannelEndPoint;
42 import org.eclipse.jetty.util.component.AggregateLifeCycle;
43 import org.eclipse.jetty.util.component.Dumpable;
44 import org.eclipse.jetty.util.log.Log;
45 import org.eclipse.jetty.util.log.Logger;
46 import org.eclipse.jetty.util.thread.Timeout;
47
48
49
50
51
52 public class HttpConnection extends AbstractConnection implements Dumpable
53 {
54 private static final Logger LOG = Log.getLogger(HttpConnection.class);
55
56 private HttpDestination _destination;
57 private HttpGenerator _generator;
58 private HttpParser _parser;
59 private boolean _http11 = true;
60 private int _status;
61 private Buffer _connectionHeader;
62 private Buffer _requestContentChunk;
63 private boolean _requestComplete;
64 private boolean _reserved;
65
66
67 private volatile HttpExchange _exchange;
68 private HttpExchange _pipeline;
69 private final Timeout.Task _idleTimeout = new ConnectionIdleTask();
70 private AtomicBoolean _idle = new AtomicBoolean(false);
71
72
73 HttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp)
74 {
75 super(endp);
76
77 _generator = new HttpGenerator(requestBuffers,endp);
78 _parser = new HttpParser(responseBuffers,endp,new Handler());
79 }
80
81 public void setReserved (boolean reserved)
82 {
83 _reserved = reserved;
84 }
85
86 public boolean isReserved()
87 {
88 return _reserved;
89 }
90
91 public HttpDestination getDestination()
92 {
93 return _destination;
94 }
95
96 public void setDestination(HttpDestination destination)
97 {
98 _destination = destination;
99 }
100
101 public boolean send(HttpExchange ex) throws IOException
102 {
103 synchronized (this)
104 {
105 if (_exchange != null)
106 {
107 if (_pipeline != null)
108 throw new IllegalStateException(this + " PIPELINED!!! _exchange=" + _exchange);
109 _pipeline = ex;
110 return true;
111 }
112
113 _exchange = ex;
114 _exchange.associate(this);
115
116
117 if (!_endp.isOpen())
118 {
119 _exchange.disassociate();
120 _exchange = null;
121 return false;
122 }
123
124 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_COMMIT);
125
126 if (_endp.isBlocking())
127 {
128 this.notify();
129 }
130 else
131 {
132 AsyncEndPoint scep = (AsyncEndPoint)_endp;
133 scep.scheduleWrite();
134 }
135
136 adjustIdleTimeout();
137
138 return true;
139 }
140 }
141
142 private void adjustIdleTimeout() throws IOException
143 {
144
145
146
147
148
149 long timeout = _exchange.getTimeout();
150 if (timeout <= 0)
151 timeout = _destination.getHttpClient().getTimeout();
152
153 long endPointTimeout = _endp.getMaxIdleTime();
154
155 if (timeout > 0 && timeout > endPointTimeout)
156 {
157
158
159
160 _endp.setMaxIdleTime(2 * (int)timeout);
161 }
162 }
163
164 public Connection handle() throws IOException
165 {
166 try
167 {
168 int no_progress = 0;
169
170 boolean failed = false;
171 while (_endp.isBufferingInput() || _endp.isOpen())
172 {
173 synchronized (this)
174 {
175 while (_exchange == null)
176 {
177 if (_endp.isBlocking())
178 {
179 try
180 {
181 this.wait();
182 }
183 catch (InterruptedException e)
184 {
185 throw new InterruptedIOException();
186 }
187 }
188 else
189 {
190 long filled = _parser.fill();
191 if (filled < 0)
192 {
193 close();
194 }
195 else
196 {
197
198 _parser.skipCRLF();
199 if (_parser.isMoreInBuffer())
200 {
201 LOG.warn("Unexpected data received but no request sent");
202 close();
203 }
204 }
205 return this;
206 }
207 }
208 }
209
210 try
211 {
212 if (_exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
213 {
214 no_progress = 0;
215 commitRequest();
216 }
217
218 long io = 0;
219 _endp.flush();
220
221 if (_generator.isComplete())
222 {
223 if (!_requestComplete)
224 {
225 _requestComplete = true;
226 _exchange.getEventListener().onRequestComplete();
227 }
228 }
229 else
230 {
231
232 synchronized (this)
233 {
234 if (_exchange == null)
235 continue;
236 }
237
238 long flushed = _generator.flushBuffer();
239 io += flushed;
240
241 if (!_generator.isComplete())
242 {
243 if (_exchange!=null)
244 {
245 InputStream in = _exchange.getRequestContentSource();
246 if (in != null)
247 {
248 if (_requestContentChunk == null || _requestContentChunk.length() == 0)
249 {
250 _requestContentChunk = _exchange.getRequestContentChunk();
251
252 if (_requestContentChunk != null)
253 _generator.addContent(_requestContentChunk,false);
254 else
255 _generator.complete();
256
257 flushed = _generator.flushBuffer();
258 io += flushed;
259 }
260 }
261 else
262 _generator.complete();
263 }
264 else
265 _generator.complete();
266 }
267 }
268
269 if (_generator.isComplete() && !_requestComplete)
270 {
271 _requestComplete = true;
272 _exchange.getEventListener().onRequestComplete();
273 }
274
275
276 if (!_parser.isComplete() && (_generator.isComplete() || _generator.isCommitted() && !_endp.isBlocking()))
277 {
278 long filled = _parser.parseAvailable();
279 io += filled;
280
281 if (_parser.isIdle() && (_endp.isInputShutdown() || !_endp.isOpen()))
282 throw new EOFException();
283 }
284
285 if (io > 0)
286 no_progress = 0;
287 else if (no_progress++ >= 1 && !_endp.isBlocking())
288 {
289
290 if (_endp instanceof SslSelectChannelEndPoint && !_generator.isComplete() && !_generator.isEmpty())
291 {
292 long flushed = _generator.flushBuffer();
293 if (flushed>0)
294 continue;
295 }
296 return this;
297 }
298 }
299 catch (Throwable e)
300 {
301 LOG.debug("Failure on " + _exchange, e);
302
303 if (e instanceof ThreadDeath)
304 throw (ThreadDeath)e;
305
306 failed = true;
307
308 synchronized (this)
309 {
310 if (_exchange != null)
311 {
312
313
314 if (_exchange.getStatus() != HttpExchange.STATUS_CANCELLING &&
315 _exchange.getStatus() != HttpExchange.STATUS_CANCELLED)
316 {
317 _exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
318 _exchange.getEventListener().onException(e);
319 }
320 }
321 else
322 {
323 if (e instanceof IOException)
324 throw (IOException)e;
325
326 if (e instanceof Error)
327 throw (Error)e;
328
329 if (e instanceof RuntimeException)
330 throw (RuntimeException)e;
331
332 throw new RuntimeException(e);
333 }
334 }
335 }
336 finally
337 {
338 boolean complete = false;
339 boolean close = failed;
340 if (!failed)
341 {
342
343 if (_generator.isComplete())
344 {
345 if (!_requestComplete)
346 {
347 _requestComplete = true;
348 _exchange.getEventListener().onRequestComplete();
349 }
350
351
352
353 if (_parser.isComplete())
354 {
355 _exchange.cancelTimeout(_destination.getHttpClient());
356 complete = true;
357 }
358 }
359
360
361 if (!_endp.isOpen() && !(_parser.isComplete()||_parser.isIdle()))
362 {
363
364 complete=true;
365 _parser.parseAvailable();
366
367 if (!(_parser.isComplete()||_parser.isIdle()))
368 {
369 LOG.warn("Incomplete {} {}",_parser,_endp);
370 if (_exchange!=null && !_exchange.isDone())
371 {
372 _exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
373 _exchange.getEventListener().onException(new EOFException("Incomplete"));
374 }
375 }
376 }
377 }
378
379 if (_endp.isInputShutdown() && !_parser.isComplete() && !_parser.isIdle())
380 {
381 if (_exchange!=null && !_exchange.isDone())
382 {
383 _exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
384 _exchange.getEventListener().onException(new EOFException("Incomplete"));
385 }
386 _endp.close();
387 }
388
389 if (complete || failed)
390 {
391 synchronized (this)
392 {
393 if (!close)
394 close = shouldClose();
395
396 reset(true);
397
398 no_progress = 0;
399 if (_exchange != null)
400 {
401 HttpExchange exchange=_exchange;
402 _exchange = null;
403
404
405 if (!close)
406 _endp.setMaxIdleTime((int)_destination.getHttpClient().getIdleTimeout());
407
408 if (_status==HttpStatus.SWITCHING_PROTOCOLS_101)
409 {
410 Connection switched=exchange.onSwitchProtocol(_endp);
411 if (switched!=null)
412 {
413
414 exchange = _pipeline;
415 _pipeline = null;
416 if (exchange!=null)
417 _destination.send(exchange);
418
419 return switched;
420 }
421 }
422
423 if (_pipeline == null)
424 {
425 if (!isReserved())
426 _destination.returnConnection(this, close);
427 }
428 else
429 {
430 if (close)
431 {
432 if (!isReserved())
433 _destination.returnConnection(this,close);
434
435 exchange = _pipeline;
436 _pipeline = null;
437 _destination.send(exchange);
438 }
439 else
440 {
441 exchange = _pipeline;
442 _pipeline = null;
443 send(exchange);
444 }
445 }
446 }
447 }
448 }
449 }
450 }
451 }
452 finally
453 {
454 _parser.returnBuffers();
455
456
457 if (!_generator.isComplete() && _generator.getBytesBuffered()>0 && _endp.isOpen() && _endp instanceof AsyncEndPoint)
458 {
459
460 ((AsyncEndPoint)_endp).scheduleWrite();
461 }
462 }
463
464 return this;
465 }
466
467 public boolean isIdle()
468 {
469 synchronized (this)
470 {
471 return _exchange == null;
472 }
473 }
474
475 public boolean isSuspended()
476 {
477 return false;
478 }
479
480 public void closed()
481 {
482 }
483
484 private void commitRequest() throws IOException
485 {
486 synchronized (this)
487 {
488 _status=0;
489 if (_exchange.getStatus() != HttpExchange.STATUS_WAITING_FOR_COMMIT)
490 throw new IllegalStateException();
491
492 _exchange.setStatus(HttpExchange.STATUS_SENDING_REQUEST);
493 _generator.setVersion(_exchange.getVersion());
494
495 String method=_exchange.getMethod();
496 String uri = _exchange.getURI();
497 if (_destination.isProxied() && !HttpMethods.CONNECT.equals(method) && uri.startsWith("/"))
498 {
499 boolean secure = _destination.isSecure();
500 String host = _destination.getAddress().getHost();
501 int port = _destination.getAddress().getPort();
502 StringBuilder absoluteURI = new StringBuilder();
503 absoluteURI.append(secure ? HttpSchemes.HTTPS : HttpSchemes.HTTP);
504 absoluteURI.append("://");
505 absoluteURI.append(host);
506
507 if (!(secure && port == 443 || !secure && port == 80))
508 absoluteURI.append(":").append(port);
509 absoluteURI.append(uri);
510 uri = absoluteURI.toString();
511 Authentication auth = _destination.getProxyAuthentication();
512 if (auth != null)
513 auth.setCredentials(_exchange);
514 }
515
516 _generator.setRequest(method, uri);
517 _parser.setHeadResponse(HttpMethods.HEAD.equalsIgnoreCase(method));
518
519 HttpFields requestHeaders = _exchange.getRequestFields();
520 if (_exchange.getVersion() >= HttpVersions.HTTP_1_1_ORDINAL)
521 {
522 if (!requestHeaders.containsKey(HttpHeaders.HOST_BUFFER))
523 requestHeaders.add(HttpHeaders.HOST_BUFFER,_destination.getHostHeader());
524 }
525
526 Buffer requestContent = _exchange.getRequestContent();
527 if (requestContent != null)
528 {
529 requestHeaders.putLongField(HttpHeaders.CONTENT_LENGTH, requestContent.length());
530 _generator.completeHeader(requestHeaders,false);
531 _generator.addContent(new View(requestContent),true);
532 }
533 else
534 {
535 InputStream requestContentStream = _exchange.getRequestContentSource();
536 if (requestContentStream != null)
537 {
538 _generator.completeHeader(requestHeaders, false);
539 int available = requestContentStream.available();
540 if (available > 0)
541 {
542
543
544 byte[] buf = new byte[available];
545 int length = requestContentStream.read(buf);
546 _generator.addContent(new ByteArrayBuffer(buf, 0, length), false);
547 }
548 }
549 else
550 {
551 requestHeaders.remove(HttpHeaders.CONTENT_LENGTH);
552 _generator.completeHeader(requestHeaders, true);
553 }
554 }
555
556 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
557 }
558 }
559
560 protected void reset(boolean returnBuffers) throws IOException
561 {
562 _requestComplete = false;
563 _connectionHeader = null;
564 _parser.reset();
565 if (returnBuffers)
566 _parser.returnBuffers();
567 _generator.reset(returnBuffers);
568 _http11 = true;
569 }
570
571 private boolean shouldClose()
572 {
573 if (_endp.isInputShutdown())
574 return true;
575 if (_connectionHeader!=null)
576 {
577 if (HttpHeaderValues.CLOSE_BUFFER.equals(_connectionHeader))
578 return true;
579 if (HttpHeaderValues.KEEP_ALIVE_BUFFER.equals(_connectionHeader))
580 return false;
581 }
582 return !_http11;
583 }
584
585 private class Handler extends HttpParser.EventHandler
586 {
587 @Override
588 public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
589 {
590
591
592
593
594
595 }
596
597 @Override
598 public void startResponse(Buffer version, int status, Buffer reason) throws IOException
599 {
600 HttpExchange exchange = _exchange;
601 if (exchange!=null)
602 {
603 switch(status)
604 {
605 case HttpStatus.CONTINUE_100:
606 case HttpStatus.PROCESSING_102:
607
608 exchange.setEventListener(new NonFinalResponseListener(exchange));
609 break;
610
611 case HttpStatus.OK_200:
612
613 if (HttpMethods.CONNECT.equalsIgnoreCase(exchange.getMethod()))
614 _parser.setHeadResponse(true);
615 break;
616 }
617
618 _http11 = HttpVersions.HTTP_1_1_BUFFER.equals(version);
619 _status=status;
620 exchange.getEventListener().onResponseStatus(version,status,reason);
621 exchange.setStatus(HttpExchange.STATUS_PARSING_HEADERS);
622 }
623 }
624
625 @Override
626 public void parsedHeader(Buffer name, Buffer value) throws IOException
627 {
628 HttpExchange exchange = _exchange;
629 if (exchange!=null)
630 {
631 if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.CONNECTION_ORDINAL)
632 {
633 _connectionHeader = HttpHeaderValues.CACHE.lookup(value);
634 }
635 exchange.getEventListener().onResponseHeader(name,value);
636 }
637 }
638
639 @Override
640 public void headerComplete() throws IOException
641 {
642 if (_endp instanceof AsyncEndPoint)
643 ((AsyncEndPoint)_endp).scheduleIdle();
644 HttpExchange exchange = _exchange;
645 if (exchange!=null)
646 exchange.setStatus(HttpExchange.STATUS_PARSING_CONTENT);
647 }
648
649 @Override
650 public void content(Buffer ref) throws IOException
651 {
652 if (_endp instanceof AsyncEndPoint)
653 ((AsyncEndPoint)_endp).scheduleIdle();
654 HttpExchange exchange = _exchange;
655 if (exchange!=null)
656 exchange.getEventListener().onResponseContent(ref);
657 }
658
659 @Override
660 public void messageComplete(long contextLength) throws IOException
661 {
662 HttpExchange exchange = _exchange;
663 if (exchange!=null)
664 exchange.setStatus(HttpExchange.STATUS_COMPLETED);
665 }
666 }
667
668 @Override
669 public String toString()
670 {
671 return "HttpConnection@" + hashCode() + "//" + _destination.getAddress().getHost() + ":" + _destination.getAddress().getPort();
672 }
673
674 public String toDetailString()
675 {
676 return toString() + " ex=" + _exchange + " idle for " + _idleTimeout.getAge();
677 }
678
679 public void close() throws IOException
680 {
681
682
683
684 HttpExchange exchange = _exchange;
685 if (exchange != null && !exchange.isDone())
686 {
687 switch (exchange.getStatus())
688 {
689 case HttpExchange.STATUS_CANCELLED:
690 case HttpExchange.STATUS_CANCELLING:
691 case HttpExchange.STATUS_COMPLETED:
692 case HttpExchange.STATUS_EXCEPTED:
693 case HttpExchange.STATUS_EXPIRED:
694 break;
695 case HttpExchange.STATUS_PARSING_CONTENT:
696 if (_endp.isInputShutdown() && _parser.isState(HttpParser.STATE_EOF_CONTENT))
697 break;
698 default:
699 String exch= exchange.toString();
700 String reason = _endp.isOpen()?(_endp.isInputShutdown()?"half closed: ":"local close: "):"closed: ";
701 exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
702 exchange.getEventListener().onException(new EOFException(reason+exch));
703 }
704 }
705
706 _endp.close();
707 }
708
709 public void setIdleTimeout()
710 {
711 synchronized (this)
712 {
713 if (_idle.compareAndSet(false, true))
714 _destination.getHttpClient().scheduleIdle(_idleTimeout);
715 else
716 throw new IllegalStateException();
717 }
718 }
719
720 public boolean cancelIdleTimeout()
721 {
722 synchronized (this)
723 {
724 if (_idle.compareAndSet(true, false))
725 {
726 _destination.getHttpClient().cancel(_idleTimeout);
727 return true;
728 }
729 }
730
731 return false;
732 }
733
734 protected void exchangeExpired(HttpExchange exchange)
735 {
736 synchronized (this)
737 {
738
739
740 if (_exchange == exchange)
741 {
742 try
743 {
744 _destination.returnConnection(this, true);
745 }
746 catch (IOException x)
747 {
748 LOG.ignore(x);
749 }
750 }
751 }
752 }
753
754
755
756
757
758 public String dump()
759 {
760 return AggregateLifeCycle.dump(this);
761 }
762
763
764
765
766
767 public void dump(Appendable out, String indent) throws IOException
768 {
769 synchronized (this)
770 {
771 out.append(String.valueOf(this)).append("\n");
772 AggregateLifeCycle.dump(out,indent,Collections.singletonList(_endp));
773 }
774 }
775
776
777 private class ConnectionIdleTask extends Timeout.Task
778 {
779
780 @Override
781 public void expired()
782 {
783
784 if (_idle.compareAndSet(true, false))
785 {
786 _destination.returnIdleConnection(HttpConnection.this);
787 }
788 }
789 }
790
791
792
793 private class NonFinalResponseListener implements HttpEventListener
794 {
795 final HttpExchange _exchange;
796 final HttpEventListener _next;
797
798
799 public NonFinalResponseListener(HttpExchange exchange)
800 {
801 _exchange=exchange;
802 _next=exchange.getEventListener();
803 }
804
805
806 public void onRequestCommitted() throws IOException
807 {
808 }
809
810
811 public void onRequestComplete() throws IOException
812 {
813 }
814
815
816 public void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException
817 {
818 }
819
820
821 public void onResponseHeader(Buffer name, Buffer value) throws IOException
822 {
823 _next.onResponseHeader(name,value);
824 }
825
826
827 public void onResponseHeaderComplete() throws IOException
828 {
829 _next.onResponseHeaderComplete();
830 }
831
832
833 public void onResponseContent(Buffer content) throws IOException
834 {
835 }
836
837
838 public void onResponseComplete() throws IOException
839 {
840 _exchange.setEventListener(_next);
841 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
842 _parser.reset();
843 }
844
845
846 public void onConnectionFailed(Throwable ex)
847 {
848 _exchange.setEventListener(_next);
849 _next.onConnectionFailed(ex);
850 }
851
852
853 public void onException(Throwable ex)
854 {
855 _exchange.setEventListener(_next);
856 _next.onException(ex);
857 }
858
859
860 public void onExpire()
861 {
862 _exchange.setEventListener(_next);
863 _next.onExpire();
864 }
865
866
867 public void onRetry()
868 {
869 _exchange.setEventListener(_next);
870 _next.onRetry();
871 }
872 }
873 }