1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.client;
15
16 import java.io.EOFException;
17 import java.io.IOException;
18 import java.io.InputStream;
19 import java.io.InterruptedIOException;
20 import java.util.Collections;
21 import java.util.concurrent.atomic.AtomicBoolean;
22
23 import org.eclipse.jetty.client.security.Authentication;
24 import org.eclipse.jetty.http.HttpFields;
25 import org.eclipse.jetty.http.HttpGenerator;
26 import org.eclipse.jetty.http.HttpHeaderValues;
27 import org.eclipse.jetty.http.HttpHeaders;
28 import org.eclipse.jetty.http.HttpMethods;
29 import org.eclipse.jetty.http.HttpParser;
30 import org.eclipse.jetty.http.HttpSchemes;
31 import org.eclipse.jetty.http.HttpStatus;
32 import org.eclipse.jetty.http.HttpVersions;
33 import org.eclipse.jetty.io.AbstractConnection;
34 import org.eclipse.jetty.io.AsyncEndPoint;
35 import org.eclipse.jetty.io.Buffer;
36 import org.eclipse.jetty.io.Buffers;
37 import org.eclipse.jetty.io.ByteArrayBuffer;
38 import org.eclipse.jetty.io.Connection;
39 import org.eclipse.jetty.io.EndPoint;
40 import org.eclipse.jetty.io.View;
41 import org.eclipse.jetty.io.nio.SslSelectChannelEndPoint;
42 import org.eclipse.jetty.util.component.AggregateLifeCycle;
43 import org.eclipse.jetty.util.component.Dumpable;
44 import org.eclipse.jetty.util.log.Log;
45 import org.eclipse.jetty.util.log.Logger;
46 import org.eclipse.jetty.util.thread.Timeout;
47
48
49
50
51
52 public class HttpConnection extends AbstractConnection implements Dumpable
53 {
54 private static final Logger LOG = Log.getLogger(HttpConnection.class);
55
56 private HttpDestination _destination;
57 private HttpGenerator _generator;
58 private HttpParser _parser;
59 private boolean _http11 = true;
60 private int _status;
61 private Buffer _connectionHeader;
62 private Buffer _requestContentChunk;
63 private boolean _requestComplete;
64 private boolean _reserved;
65
66
67 private volatile HttpExchange _exchange;
68 private HttpExchange _pipeline;
69 private final Timeout.Task _idleTimeout = new ConnectionIdleTask();
70 private AtomicBoolean _idle = new AtomicBoolean(false);
71
72
73 HttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp)
74 {
75 super(endp);
76
77 _generator = new HttpGenerator(requestBuffers,endp);
78 _parser = new HttpParser(responseBuffers,endp,new Handler());
79 }
80
81 public void setReserved (boolean reserved)
82 {
83 _reserved = reserved;
84 }
85
86 public boolean isReserved()
87 {
88 return _reserved;
89 }
90
91 public HttpDestination getDestination()
92 {
93 return _destination;
94 }
95
96 public void setDestination(HttpDestination destination)
97 {
98 _destination = destination;
99 }
100
101 public boolean send(HttpExchange ex) throws IOException
102 {
103 synchronized (this)
104 {
105 if (_exchange != null)
106 {
107 if (_pipeline != null)
108 throw new IllegalStateException(this + " PIPELINED!!! _exchange=" + _exchange);
109 _pipeline = ex;
110 return true;
111 }
112
113 _exchange = ex;
114 _exchange.associate(this);
115
116
117 if (!_endp.isOpen())
118 {
119 _exchange.disassociate();
120 _exchange = null;
121 return false;
122 }
123
124 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_COMMIT);
125
126 if (_endp.isBlocking())
127 {
128 this.notify();
129 }
130 else
131 {
132 AsyncEndPoint scep = (AsyncEndPoint)_endp;
133 scep.scheduleWrite();
134 }
135
136 adjustIdleTimeout();
137
138 return true;
139 }
140 }
141
142 private void adjustIdleTimeout() throws IOException
143 {
144
145
146
147
148
149 long timeout = _exchange.getTimeout();
150 if (timeout <= 0)
151 timeout = _destination.getHttpClient().getTimeout();
152
153 long endPointTimeout = _endp.getMaxIdleTime();
154
155 if (timeout > 0 && timeout > endPointTimeout)
156 {
157
158
159
160 _endp.setMaxIdleTime(2 * (int)timeout);
161 }
162 }
163
164 public Connection handle() throws IOException
165 {
166 try
167 {
168 int no_progress = 0;
169
170 boolean failed = false;
171 while (_endp.isBufferingInput() || _endp.isOpen())
172 {
173 synchronized (this)
174 {
175 while (_exchange == null)
176 {
177 if (_endp.isBlocking())
178 {
179 try
180 {
181 this.wait();
182 }
183 catch (InterruptedException e)
184 {
185 throw new InterruptedIOException();
186 }
187 }
188 else
189 {
190 long filled = _parser.fill();
191 if (filled < 0)
192 {
193 close();
194 }
195 else
196 {
197
198 _parser.skipCRLF();
199 if (_parser.isMoreInBuffer())
200 {
201 LOG.warn("Unexpected data received but no request sent");
202 close();
203 }
204 }
205 return this;
206 }
207 }
208 }
209
210 try
211 {
212 if (_exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
213 {
214 no_progress = 0;
215 commitRequest();
216 }
217
218 long io = 0;
219 _endp.flush();
220
221 if (_generator.isComplete())
222 {
223 if (!_requestComplete)
224 {
225 _requestComplete = true;
226 _exchange.getEventListener().onRequestComplete();
227 }
228 }
229 else
230 {
231
232 synchronized (this)
233 {
234 if (_exchange == null)
235 continue;
236 }
237
238 long flushed = _generator.flushBuffer();
239 io += flushed;
240
241 if (!_generator.isComplete())
242 {
243 if (_exchange!=null)
244 {
245 InputStream in = _exchange.getRequestContentSource();
246 if (in != null)
247 {
248 if (_requestContentChunk == null || _requestContentChunk.length() == 0)
249 {
250 _requestContentChunk = _exchange.getRequestContentChunk();
251
252 if (_requestContentChunk != null)
253 _generator.addContent(_requestContentChunk,false);
254 else
255 _generator.complete();
256
257 flushed = _generator.flushBuffer();
258 io += flushed;
259 }
260 }
261 else
262 _generator.complete();
263 }
264 else
265 _generator.complete();
266 }
267 }
268
269 if (_generator.isComplete() && !_requestComplete)
270 {
271 _requestComplete = true;
272 _exchange.getEventListener().onRequestComplete();
273 }
274
275
276 if (!_parser.isComplete() && (_generator.isComplete() || _generator.isCommitted() && !_endp.isBlocking()))
277 {
278 long filled = _parser.parseAvailable();
279 io += filled;
280 }
281
282 if (io > 0)
283 no_progress = 0;
284 else if (no_progress++ >= 1 && !_endp.isBlocking())
285 {
286
287 if (_endp instanceof SslSelectChannelEndPoint && !_generator.isComplete() && !_generator.isEmpty())
288 {
289 long flushed = _generator.flushBuffer();
290 if (flushed>0)
291 continue;
292 }
293 return this;
294 }
295 }
296 catch (Throwable e)
297 {
298 LOG.debug("Failure on " + _exchange, e);
299
300 if (e instanceof ThreadDeath)
301 throw (ThreadDeath)e;
302
303 failed = true;
304
305 synchronized (this)
306 {
307 if (_exchange != null)
308 {
309
310
311 if (_exchange.getStatus() != HttpExchange.STATUS_CANCELLING &&
312 _exchange.getStatus() != HttpExchange.STATUS_CANCELLED)
313 {
314 _exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
315 _exchange.getEventListener().onException(e);
316 }
317 }
318 else
319 {
320 if (e instanceof IOException)
321 throw (IOException)e;
322
323 if (e instanceof Error)
324 throw (Error)e;
325
326 if (e instanceof RuntimeException)
327 throw (RuntimeException)e;
328
329 throw new RuntimeException(e);
330 }
331 }
332 }
333 finally
334 {
335 boolean complete = false;
336 boolean close = failed;
337 if (!failed)
338 {
339
340 if (_generator.isComplete())
341 {
342 if (!_requestComplete)
343 {
344 _requestComplete = true;
345 _exchange.getEventListener().onRequestComplete();
346 }
347
348
349
350 if (_parser.isComplete())
351 {
352 _exchange.cancelTimeout(_destination.getHttpClient());
353 complete = true;
354 }
355 }
356 }
357
358 if (_generator.isComplete() && !_parser.isComplete())
359 {
360 if (!_endp.isOpen() || _endp.isInputShutdown())
361 {
362 complete=true;
363 close=true;
364 close();
365 }
366 }
367
368 if (complete || failed)
369 {
370 synchronized (this)
371 {
372 if (!close)
373 close = shouldClose();
374
375 reset(true);
376
377 no_progress = 0;
378 if (_exchange != null)
379 {
380 HttpExchange exchange=_exchange;
381 _exchange = null;
382
383
384 if (!close)
385 _endp.setMaxIdleTime((int)_destination.getHttpClient().getIdleTimeout());
386
387 if (_status==HttpStatus.SWITCHING_PROTOCOLS_101)
388 {
389 Connection switched=exchange.onSwitchProtocol(_endp);
390 if (switched!=null)
391 {
392
393 exchange = _pipeline;
394 _pipeline = null;
395 if (exchange!=null)
396 _destination.send(exchange);
397
398 return switched;
399 }
400 }
401
402 if (_pipeline == null)
403 {
404 if (!isReserved())
405 _destination.returnConnection(this, close);
406 }
407 else
408 {
409 if (close)
410 {
411 if (!isReserved())
412 _destination.returnConnection(this,close);
413
414 exchange = _pipeline;
415 _pipeline = null;
416 _destination.send(exchange);
417 }
418 else
419 {
420 exchange = _pipeline;
421 _pipeline = null;
422 send(exchange);
423 }
424 }
425 }
426 }
427 }
428 }
429 }
430 }
431 finally
432 {
433 _parser.returnBuffers();
434
435
436 if (!_generator.isComplete() && _generator.getBytesBuffered()>0 && _endp instanceof AsyncEndPoint)
437 {
438
439 ((AsyncEndPoint)_endp).scheduleWrite();
440 }
441 }
442
443 return this;
444 }
445
446 public boolean isIdle()
447 {
448 synchronized (this)
449 {
450 return _exchange == null;
451 }
452 }
453
454 public boolean isSuspended()
455 {
456 return false;
457 }
458
459 public void closed()
460 {
461 }
462
463 private void commitRequest() throws IOException
464 {
465 synchronized (this)
466 {
467 _status=0;
468 if (_exchange.getStatus() != HttpExchange.STATUS_WAITING_FOR_COMMIT)
469 throw new IllegalStateException();
470
471 _exchange.setStatus(HttpExchange.STATUS_SENDING_REQUEST);
472 _generator.setVersion(_exchange.getVersion());
473
474 String method=_exchange.getMethod();
475 String uri = _exchange.getURI();
476 if (_destination.isProxied() && !HttpMethods.CONNECT.equals(method) && uri.startsWith("/"))
477 {
478 boolean secure = _destination.isSecure();
479 String host = _destination.getAddress().getHost();
480 int port = _destination.getAddress().getPort();
481 StringBuilder absoluteURI = new StringBuilder();
482 absoluteURI.append(secure ? HttpSchemes.HTTPS : HttpSchemes.HTTP);
483 absoluteURI.append("://");
484 absoluteURI.append(host);
485
486 if (!(secure && port == 443 || !secure && port == 80))
487 absoluteURI.append(":").append(port);
488 absoluteURI.append(uri);
489 uri = absoluteURI.toString();
490 Authentication auth = _destination.getProxyAuthentication();
491 if (auth != null)
492 auth.setCredentials(_exchange);
493 }
494
495 _generator.setRequest(method, uri);
496 _parser.setHeadResponse(HttpMethods.HEAD.equalsIgnoreCase(method));
497
498 HttpFields requestHeaders = _exchange.getRequestFields();
499 if (_exchange.getVersion() >= HttpVersions.HTTP_1_1_ORDINAL)
500 {
501 if (!requestHeaders.containsKey(HttpHeaders.HOST_BUFFER))
502 requestHeaders.add(HttpHeaders.HOST_BUFFER,_destination.getHostHeader());
503 }
504
505 Buffer requestContent = _exchange.getRequestContent();
506 if (requestContent != null)
507 {
508 requestHeaders.putLongField(HttpHeaders.CONTENT_LENGTH, requestContent.length());
509 _generator.completeHeader(requestHeaders,false);
510 _generator.addContent(new View(requestContent),true);
511 }
512 else
513 {
514 InputStream requestContentStream = _exchange.getRequestContentSource();
515 if (requestContentStream != null)
516 {
517 _generator.completeHeader(requestHeaders, false);
518 int available = requestContentStream.available();
519 if (available > 0)
520 {
521
522
523 byte[] buf = new byte[available];
524 int length = requestContentStream.read(buf);
525 _generator.addContent(new ByteArrayBuffer(buf, 0, length), false);
526 }
527 }
528 else
529 {
530 requestHeaders.remove(HttpHeaders.CONTENT_LENGTH);
531 _generator.completeHeader(requestHeaders, true);
532 }
533 }
534
535 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
536 }
537 }
538
539 protected void reset(boolean returnBuffers) throws IOException
540 {
541 _requestComplete = false;
542 _connectionHeader = null;
543 _parser.reset();
544 if (returnBuffers)
545 _parser.returnBuffers();
546 _generator.reset(returnBuffers);
547 _http11 = true;
548 }
549
550 private boolean shouldClose()
551 {
552 if (_connectionHeader!=null)
553 {
554 if (HttpHeaderValues.CLOSE_BUFFER.equals(_connectionHeader))
555 return true;
556 if (HttpHeaderValues.KEEP_ALIVE_BUFFER.equals(_connectionHeader))
557 return false;
558 }
559 return !_http11;
560 }
561
562 private class Handler extends HttpParser.EventHandler
563 {
564 @Override
565 public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
566 {
567
568
569
570
571
572 }
573
574 @Override
575 public void startResponse(Buffer version, int status, Buffer reason) throws IOException
576 {
577 HttpExchange exchange = _exchange;
578 if (exchange!=null)
579 {
580 switch(status)
581 {
582 case HttpStatus.CONTINUE_100:
583 case HttpStatus.PROCESSING_102:
584
585 exchange.setEventListener(new NonFinalResponseListener(exchange));
586 break;
587
588 case HttpStatus.OK_200:
589
590 if (HttpMethods.CONNECT.equalsIgnoreCase(exchange.getMethod()))
591 _parser.setHeadResponse(true);
592 break;
593 }
594
595 _http11 = HttpVersions.HTTP_1_1_BUFFER.equals(version);
596 _status=status;
597 exchange.getEventListener().onResponseStatus(version,status,reason);
598 exchange.setStatus(HttpExchange.STATUS_PARSING_HEADERS);
599 }
600 }
601
602 @Override
603 public void parsedHeader(Buffer name, Buffer value) throws IOException
604 {
605 HttpExchange exchange = _exchange;
606 if (exchange!=null)
607 {
608 if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.CONNECTION_ORDINAL)
609 {
610 _connectionHeader = HttpHeaderValues.CACHE.lookup(value);
611 }
612 exchange.getEventListener().onResponseHeader(name,value);
613 }
614 }
615
616 @Override
617 public void headerComplete() throws IOException
618 {
619 if (_endp instanceof AsyncEndPoint)
620 ((AsyncEndPoint)_endp).scheduleIdle();
621 HttpExchange exchange = _exchange;
622 if (exchange!=null)
623 exchange.setStatus(HttpExchange.STATUS_PARSING_CONTENT);
624 }
625
626 @Override
627 public void content(Buffer ref) throws IOException
628 {
629 if (_endp instanceof AsyncEndPoint)
630 ((AsyncEndPoint)_endp).scheduleIdle();
631 HttpExchange exchange = _exchange;
632 if (exchange!=null)
633 exchange.getEventListener().onResponseContent(ref);
634 }
635
636 @Override
637 public void messageComplete(long contextLength) throws IOException
638 {
639 HttpExchange exchange = _exchange;
640 if (exchange!=null)
641 exchange.setStatus(HttpExchange.STATUS_COMPLETED);
642 }
643 }
644
645 @Override
646 public String toString()
647 {
648 return "HttpConnection@" + hashCode() + "//" + _destination.getAddress().getHost() + ":" + _destination.getAddress().getPort();
649 }
650
651 public String toDetailString()
652 {
653 return toString() + " ex=" + _exchange + " idle for " + _idleTimeout.getAge();
654 }
655
656 public void close() throws IOException
657 {
658
659
660
661 HttpExchange exchange = _exchange;
662 if (exchange != null && !exchange.isDone())
663 {
664 switch (exchange.getStatus())
665 {
666 case HttpExchange.STATUS_CANCELLED:
667 case HttpExchange.STATUS_CANCELLING:
668 case HttpExchange.STATUS_COMPLETED:
669 case HttpExchange.STATUS_EXCEPTED:
670 case HttpExchange.STATUS_EXPIRED:
671 break;
672 default:
673 String exch= exchange.toString();
674 String reason = _endp.isOpen()?(_endp.isInputShutdown()?"half closed: ":"local close: "):"closed: ";
675 exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
676 exchange.getEventListener().onException(new EOFException(reason+exch));
677 }
678 }
679
680 _endp.close();
681 }
682
683 public void setIdleTimeout()
684 {
685 synchronized (this)
686 {
687 if (_idle.compareAndSet(false, true))
688 _destination.getHttpClient().scheduleIdle(_idleTimeout);
689 else
690 throw new IllegalStateException();
691 }
692 }
693
694 public boolean cancelIdleTimeout()
695 {
696 synchronized (this)
697 {
698 if (_idle.compareAndSet(true, false))
699 {
700 _destination.getHttpClient().cancel(_idleTimeout);
701 return true;
702 }
703 }
704
705 return false;
706 }
707
708 protected void exchangeExpired(HttpExchange exchange)
709 {
710 synchronized (this)
711 {
712
713
714 if (_exchange == exchange)
715 {
716 try
717 {
718 _destination.returnConnection(this, true);
719 }
720 catch (IOException x)
721 {
722 LOG.ignore(x);
723 }
724 }
725 }
726 }
727
728
729
730
731
732 public String dump()
733 {
734 return AggregateLifeCycle.dump(this);
735 }
736
737
738
739
740
741 public void dump(Appendable out, String indent) throws IOException
742 {
743 synchronized (this)
744 {
745 out.append(String.valueOf(this)).append("\n");
746 AggregateLifeCycle.dump(out,indent,Collections.singletonList(_endp));
747 }
748 }
749
750
751 private class ConnectionIdleTask extends Timeout.Task
752 {
753
754 @Override
755 public void expired()
756 {
757
758 if (_idle.compareAndSet(true, false))
759 {
760 _destination.returnIdleConnection(HttpConnection.this);
761 }
762 }
763 }
764
765
766
767 private class NonFinalResponseListener implements HttpEventListener
768 {
769 final HttpExchange _exchange;
770 final HttpEventListener _next;
771
772
773 public NonFinalResponseListener(HttpExchange exchange)
774 {
775 _exchange=exchange;
776 _next=exchange.getEventListener();
777 }
778
779
780 public void onRequestCommitted() throws IOException
781 {
782 }
783
784
785 public void onRequestComplete() throws IOException
786 {
787 }
788
789
790 public void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException
791 {
792 }
793
794
795 public void onResponseHeader(Buffer name, Buffer value) throws IOException
796 {
797 _next.onResponseHeader(name,value);
798 }
799
800
801 public void onResponseHeaderComplete() throws IOException
802 {
803 _next.onResponseHeaderComplete();
804 }
805
806
807 public void onResponseContent(Buffer content) throws IOException
808 {
809 }
810
811
812 public void onResponseComplete() throws IOException
813 {
814 _exchange.setEventListener(_next);
815 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
816 _parser.reset();
817 }
818
819
820 public void onConnectionFailed(Throwable ex)
821 {
822 _exchange.setEventListener(_next);
823 _next.onConnectionFailed(ex);
824 }
825
826
827 public void onException(Throwable ex)
828 {
829 _exchange.setEventListener(_next);
830 _next.onException(ex);
831 }
832
833
834 public void onExpire()
835 {
836 _exchange.setEventListener(_next);
837 _next.onExpire();
838 }
839
840
841 public void onRetry()
842 {
843 _exchange.setEventListener(_next);
844 _next.onRetry();
845 }
846 }
847 }