1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.client;
15
16 import java.io.EOFException;
17 import java.io.IOException;
18 import java.io.InputStream;
19 import java.io.InterruptedIOException;
20 import java.util.concurrent.atomic.AtomicBoolean;
21
22 import org.eclipse.jetty.client.security.Authentication;
23 import org.eclipse.jetty.http.HttpFields;
24 import org.eclipse.jetty.http.HttpGenerator;
25 import org.eclipse.jetty.http.HttpHeaderValues;
26 import org.eclipse.jetty.http.HttpHeaders;
27 import org.eclipse.jetty.http.HttpMethods;
28 import org.eclipse.jetty.http.HttpParser;
29 import org.eclipse.jetty.http.HttpSchemes;
30 import org.eclipse.jetty.http.HttpStatus;
31 import org.eclipse.jetty.http.HttpVersions;
32 import org.eclipse.jetty.io.AsyncEndPoint;
33 import org.eclipse.jetty.io.Buffer;
34 import org.eclipse.jetty.io.Buffers;
35 import org.eclipse.jetty.io.ByteArrayBuffer;
36 import org.eclipse.jetty.io.Connection;
37 import org.eclipse.jetty.io.EndPoint;
38 import org.eclipse.jetty.io.View;
39 import org.eclipse.jetty.io.nio.SslSelectChannelEndPoint;
40 import org.eclipse.jetty.util.log.Log;
41 import org.eclipse.jetty.util.thread.Timeout;
42
43
44
45
46
47 public class HttpConnection implements Connection
48 {
49 private HttpDestination _destination;
50 private EndPoint _endp;
51 private HttpGenerator _generator;
52 private HttpParser _parser;
53 private boolean _http11 = true;
54 private int _status;
55 private Buffer _connectionHeader;
56 private Buffer _requestContentChunk;
57 private boolean _requestComplete;
58 private boolean _reserved;
59
60 private volatile HttpExchange _exchange;
61 private HttpExchange _pipeline;
62 private final Timeout.Task _timeout = new TimeoutTask();
63 private AtomicBoolean _idle = new AtomicBoolean(false);
64
65 public void dump() throws IOException
66 {
67 Log.info("endp=" + _endp + " " + _endp.isBufferingInput() + " " + _endp.isBufferingOutput());
68 Log.info("generator=" + _generator);
69 Log.info("parser=" + _parser.getState() + " " + _parser.isMoreInBuffer());
70 Log.info("exchange=" + _exchange);
71 if (_endp instanceof SslSelectChannelEndPoint)
72 ((SslSelectChannelEndPoint)_endp).dump();
73 }
74
75 HttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp)
76 {
77 _endp = endp;
78 _generator = new HttpGenerator(requestBuffers,endp);
79 _parser = new HttpParser(responseBuffers,endp,new Handler());
80 }
81
82 public long getTimeStamp()
83 {
84 return -1;
85 }
86
87 public void setReserved (boolean reserved)
88 {
89 _reserved = reserved;
90 }
91
92 public boolean isReserved()
93 {
94 return _reserved;
95 }
96
97 public HttpDestination getDestination()
98 {
99 return _destination;
100 }
101
102 public void setDestination(HttpDestination destination)
103 {
104 _destination = destination;
105 }
106
107 public boolean send(HttpExchange ex) throws IOException
108 {
109 synchronized (this)
110 {
111 if (_exchange != null)
112 {
113 if (_pipeline != null)
114 throw new IllegalStateException(this + " PIPELINED!!! _exchange=" + _exchange);
115 _pipeline = ex;
116 return true;
117 }
118
119 if (!_endp.isOpen())
120 return false;
121
122 _exchange = ex;
123 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_COMMIT);
124
125 if (_endp.isBlocking())
126 {
127 this.notify();
128 }
129 else
130 {
131 AsyncEndPoint scep = (AsyncEndPoint)_endp;
132 scep.scheduleWrite();
133 }
134
135 long exchTimeout = _exchange.getTimeout();
136 if (exchTimeout > 0)
137 {
138 _destination.getHttpClient().schedule(_timeout, exchTimeout);
139 }
140 else
141 {
142 _destination.getHttpClient().schedule(_timeout);
143 }
144
145 return true;
146 }
147 }
148
149 public Connection handle() throws IOException
150 {
151 if (_exchange != null)
152 _exchange.associate(this);
153
154 try
155 {
156 int no_progress = 0;
157
158 boolean failed = false;
159 while (_endp.isBufferingInput() || _endp.isOpen())
160 {
161 synchronized (this)
162 {
163 while (_exchange == null)
164 {
165 if (_endp.isBlocking())
166 {
167 try
168 {
169 this.wait();
170 }
171 catch (InterruptedException e)
172 {
173 throw new InterruptedIOException();
174 }
175 }
176 else
177 {
178
179 _parser.fill();
180 _parser.skipCRLF();
181 if (_parser.isMoreInBuffer())
182 {
183 Log.warn("Unexpected data received but no request sent");
184 close();
185 }
186 return this;
187 }
188 }
189 if (!_exchange.isAssociated())
190 _exchange.associate(this);
191 }
192
193 try
194 {
195 if (_exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
196 {
197 no_progress = 0;
198 commitRequest();
199 }
200
201 long io = 0;
202 _endp.flush();
203
204 if (_generator.isComplete())
205 {
206 if (!_requestComplete)
207 {
208 _requestComplete = true;
209 _exchange.getEventListener().onRequestComplete();
210 }
211 }
212 else
213 {
214
215 synchronized (this)
216 {
217 if (_exchange == null)
218 continue;
219 }
220
221 long flushed = _generator.flushBuffer();
222 io += flushed;
223
224 if (!_generator.isComplete())
225 {
226 if (_exchange!=null)
227 {
228 InputStream in = _exchange.getRequestContentSource();
229 if (in != null)
230 {
231 if (_requestContentChunk == null || _requestContentChunk.length() == 0)
232 {
233 _requestContentChunk = _exchange.getRequestContentChunk();
234 _destination.getHttpClient().schedule(_timeout);
235
236 if (_requestContentChunk != null)
237 _generator.addContent(_requestContentChunk,false);
238 else
239 _generator.complete();
240
241 flushed = _generator.flushBuffer();
242 io += flushed;
243 }
244 }
245 else
246 _generator.complete();
247 }
248 else
249 _generator.complete();
250 }
251 }
252
253 if (_generator.isComplete() && !_requestComplete)
254 {
255 _requestComplete = true;
256 _exchange.getEventListener().onRequestComplete();
257 }
258
259
260 if (!_parser.isComplete() && (_generator.isComplete() || _generator.isCommitted() && !_endp.isBlocking()))
261 {
262 long filled = _parser.parseAvailable();
263 io += filled;
264 }
265
266 if (io > 0)
267 no_progress = 0;
268 else if (no_progress++ >= 1 && !_endp.isBlocking())
269 {
270
271 if (_endp instanceof SslSelectChannelEndPoint && !_generator.isComplete() && !_generator.isEmpty())
272 {
273 long flushed = _generator.flushBuffer();
274 if (flushed>0)
275 continue;
276 }
277 return this;
278 }
279 }
280 catch (Throwable e)
281 {
282 Log.debug("Failure on " + _exchange, e);
283
284 if (e instanceof ThreadDeath)
285 throw (ThreadDeath)e;
286
287 failed = true;
288
289 synchronized (this)
290 {
291 if (_exchange != null)
292 {
293
294
295 if (_exchange.getStatus() != HttpExchange.STATUS_CANCELLING)
296 {
297 _exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
298 _exchange.getEventListener().onException(e);
299 }
300 }
301 else
302 {
303 if (e instanceof IOException)
304 throw (IOException)e;
305
306 if (e instanceof Error)
307 throw (Error)e;
308
309 if (e instanceof RuntimeException)
310 throw (RuntimeException)e;
311
312 throw new RuntimeException(e);
313 }
314 }
315 }
316 finally
317 {
318 boolean complete = false;
319 boolean close = failed;
320 if (!failed)
321 {
322
323 if (_generator.isComplete())
324 {
325 if (!_requestComplete)
326 {
327 _requestComplete = true;
328 _exchange.getEventListener().onRequestComplete();
329 }
330
331
332
333 if (_parser.isComplete())
334 {
335 _destination.getHttpClient().cancel(_timeout);
336 complete = true;
337 }
338 }
339 }
340
341 if (complete || failed)
342 {
343 synchronized (this)
344 {
345 if (!close)
346 close = shouldClose();
347
348 reset(true);
349
350 no_progress = 0;
351 if (_exchange != null)
352 {
353 HttpExchange exchange=_exchange;
354 _exchange.disassociate();
355 _exchange = null;
356
357 if (_status==HttpStatus.SWITCHING_PROTOCOLS_101)
358 {
359 Connection switched=exchange.onSwitchProtocol(_endp);
360 if (switched!=null)
361 {
362
363 exchange = _pipeline;
364 _pipeline = null;
365 if (exchange!=null)
366 _destination.send(exchange);
367
368 return switched;
369 }
370 }
371
372 if (_pipeline == null)
373 {
374 if (!isReserved())
375 _destination.returnConnection(this, close);
376 }
377 else
378 {
379 if (close)
380 {
381 if (!isReserved())
382 _destination.returnConnection(this,close);
383
384 exchange = _pipeline;
385 _pipeline = null;
386 _destination.send(exchange);
387 }
388 else
389 {
390 exchange = _pipeline;
391 _pipeline = null;
392 send(exchange);
393 }
394 }
395
396 }
397 }
398 }
399 }
400 }
401 }
402 finally
403 {
404 if (_exchange != null && _exchange.isAssociated())
405 {
406 _exchange.disassociate();
407 }
408
409
410 if (!_generator.isComplete() && _generator.getBytesBuffered()>0 && _endp instanceof AsyncEndPoint)
411 {
412
413 ((AsyncEndPoint)_endp).setWritable(false);
414 }
415 }
416
417 return this;
418 }
419
420 public boolean isIdle()
421 {
422 synchronized (this)
423 {
424 return _exchange == null;
425 }
426 }
427
428
429
430
431 public boolean isSuspended()
432 {
433 return false;
434 }
435
436 public void closed()
437 {
438 }
439
440 public EndPoint getEndPoint()
441 {
442 return _endp;
443 }
444
445 private void commitRequest() throws IOException
446 {
447 synchronized (this)
448 {
449 _status=0;
450 if (_exchange.getStatus() != HttpExchange.STATUS_WAITING_FOR_COMMIT)
451 throw new IllegalStateException();
452
453 _exchange.setStatus(HttpExchange.STATUS_SENDING_REQUEST);
454 _generator.setVersion(_exchange.getVersion());
455
456 String method=_exchange.getMethod();
457 String uri = _exchange.getURI();
458 if (_destination.isProxied() && !HttpMethods.CONNECT.equals(method) && uri.startsWith("/"))
459 {
460 boolean secure = _destination.isSecure();
461 String host = _destination.getAddress().getHost();
462 int port = _destination.getAddress().getPort();
463 StringBuilder absoluteURI = new StringBuilder();
464 absoluteURI.append(secure ? HttpSchemes.HTTPS : HttpSchemes.HTTP);
465 absoluteURI.append("://");
466 absoluteURI.append(host);
467
468 if (!(secure && port == 443 || !secure && port == 80))
469 absoluteURI.append(":").append(port);
470 absoluteURI.append(uri);
471 uri = absoluteURI.toString();
472 Authentication auth = _destination.getProxyAuthentication();
473 if (auth != null)
474 auth.setCredentials(_exchange);
475 }
476
477 _generator.setRequest(method, uri);
478 _parser.setHeadResponse(HttpMethods.HEAD.equalsIgnoreCase(method));
479
480 HttpFields requestHeaders = _exchange.getRequestFields();
481 if (_exchange.getVersion() >= HttpVersions.HTTP_1_1_ORDINAL)
482 {
483 if (!requestHeaders.containsKey(HttpHeaders.HOST_BUFFER))
484 requestHeaders.add(HttpHeaders.HOST_BUFFER,_destination.getHostHeader());
485 }
486
487 Buffer requestContent = _exchange.getRequestContent();
488 if (requestContent != null)
489 {
490 requestHeaders.putLongField(HttpHeaders.CONTENT_LENGTH, requestContent.length());
491 _generator.completeHeader(requestHeaders,false);
492 _generator.addContent(new View(requestContent),true);
493 }
494 else
495 {
496 InputStream requestContentStream = _exchange.getRequestContentSource();
497 if (requestContentStream != null)
498 {
499 _generator.completeHeader(requestHeaders, false);
500 int available = requestContentStream.available();
501 if (available > 0)
502 {
503
504
505 byte[] buf = new byte[available];
506 int length = requestContentStream.read(buf);
507 _generator.addContent(new ByteArrayBuffer(buf, 0, length), false);
508 }
509 }
510 else
511 {
512 requestHeaders.remove(HttpHeaders.CONTENT_LENGTH);
513 _generator.completeHeader(requestHeaders, true);
514 }
515 }
516
517 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
518 }
519 }
520
521 protected void reset(boolean returnBuffers) throws IOException
522 {
523 _requestComplete = false;
524 _connectionHeader = null;
525 _parser.reset(returnBuffers);
526 _generator.reset(returnBuffers);
527 _http11 = true;
528 }
529
530 private boolean shouldClose()
531 {
532 if (_connectionHeader!=null)
533 {
534 if (HttpHeaderValues.CLOSE_BUFFER.equals(_connectionHeader))
535 return true;
536 if (HttpHeaderValues.KEEP_ALIVE_BUFFER.equals(_connectionHeader))
537 return false;
538 }
539 return !_http11;
540 }
541
542 private class Handler extends HttpParser.EventHandler
543 {
544 @Override
545 public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
546 {
547
548
549
550
551
552 }
553
554 @Override
555 public void startResponse(Buffer version, int status, Buffer reason) throws IOException
556 {
557 HttpExchange exchange = _exchange;
558 if (exchange!=null)
559 {
560 _http11 = HttpVersions.HTTP_1_1_BUFFER.equals(version);
561 _status=status;
562 exchange.getEventListener().onResponseStatus(version,status,reason);
563 exchange.setStatus(HttpExchange.STATUS_PARSING_HEADERS);
564 }
565 }
566
567 @Override
568 public void parsedHeader(Buffer name, Buffer value) throws IOException
569 {
570 HttpExchange exchange = _exchange;
571 if (exchange!=null)
572 {
573 if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.CONNECTION_ORDINAL)
574 {
575 _connectionHeader = HttpHeaderValues.CACHE.lookup(value);
576 }
577 exchange.getEventListener().onResponseHeader(name,value);
578 }
579 }
580
581 @Override
582 public void headerComplete() throws IOException
583 {
584 if (_endp instanceof AsyncEndPoint)
585 ((AsyncEndPoint)_endp).scheduleIdle();
586 HttpExchange exchange = _exchange;
587 if (exchange!=null)
588 exchange.setStatus(HttpExchange.STATUS_PARSING_CONTENT);
589 }
590
591 @Override
592 public void content(Buffer ref) throws IOException
593 {
594 if (_endp instanceof AsyncEndPoint)
595 ((AsyncEndPoint)_endp).scheduleIdle();
596 HttpExchange exchange = _exchange;
597 if (exchange!=null)
598 exchange.getEventListener().onResponseContent(ref);
599 }
600
601 @Override
602 public void messageComplete(long contextLength) throws IOException
603 {
604 HttpExchange exchange = _exchange;
605 if (exchange!=null)
606 exchange.setStatus(HttpExchange.STATUS_COMPLETED);
607 }
608 }
609
610 @Override
611 public String toString()
612 {
613 return "HttpConnection@" + hashCode() + "//" + _destination.getAddress().getHost() + ":" + _destination.getAddress().getPort();
614 }
615
616 public String toDetailString()
617 {
618 return toString() + " ex=" + _exchange + " " + _timeout.getAge();
619 }
620
621 public void close() throws IOException
622 {
623
624
625
626 if (_exchange != null && !_exchange.isDone())
627 {
628 switch (_exchange.getStatus())
629 {
630 case HttpExchange.STATUS_CANCELLED:
631 case HttpExchange.STATUS_CANCELLING:
632 case HttpExchange.STATUS_COMPLETED:
633 case HttpExchange.STATUS_EXCEPTED:
634 case HttpExchange.STATUS_EXPIRED:
635 break;
636 default:
637 _exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
638 _exchange.getEventListener().onException(new EOFException("local close"));
639 }
640 }
641
642 _endp.close();
643 }
644
645 public void setIdleTimeout()
646 {
647 synchronized (this)
648 {
649 if (_idle.compareAndSet(false,true))
650 _destination.getHttpClient().scheduleIdle(_timeout);
651 else
652 throw new IllegalStateException();
653 }
654 }
655
656 public boolean cancelIdleTimeout()
657 {
658 synchronized (this)
659 {
660 if (_idle.compareAndSet(true,false))
661 {
662 _destination.getHttpClient().cancel(_timeout);
663 return true;
664 }
665 }
666
667 return false;
668 }
669
670 private class TimeoutTask extends Timeout.Task
671 {
672 @Override
673 public void expired()
674 {
675 HttpExchange ex = null;
676 try
677 {
678 synchronized (HttpConnection.this)
679 {
680 ex = _exchange;
681 _exchange = null;
682 if (ex != null)
683 {
684 ex.disassociate();
685 _destination.returnConnection(HttpConnection.this, true);
686 }
687 else if (_idle.compareAndSet(true,false))
688 {
689 _destination.returnIdleConnection(HttpConnection.this);
690 }
691 }
692 }
693 catch (Exception e)
694 {
695 Log.debug(e);
696 }
697 finally
698 {
699 try
700 {
701 close();
702 }
703 catch (IOException e)
704 {
705 Log.ignore(e);
706 }
707
708 if (ex != null && ex.getStatus() < HttpExchange.STATUS_COMPLETED)
709 {
710 ex.setStatus(HttpExchange.STATUS_EXPIRED);
711 }
712 }
713 }
714 }
715 }