1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.client;
15
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.io.InterruptedIOException;
19 import java.util.concurrent.atomic.AtomicBoolean;
20
21 import org.eclipse.jetty.client.security.Authentication;
22 import org.eclipse.jetty.http.HttpFields;
23 import org.eclipse.jetty.http.HttpGenerator;
24 import org.eclipse.jetty.http.HttpHeaderValues;
25 import org.eclipse.jetty.http.HttpHeaders;
26 import org.eclipse.jetty.http.HttpMethods;
27 import org.eclipse.jetty.http.HttpParser;
28 import org.eclipse.jetty.http.HttpSchemes;
29 import org.eclipse.jetty.http.HttpStatus;
30 import org.eclipse.jetty.http.HttpVersions;
31 import org.eclipse.jetty.io.AsyncEndPoint;
32 import org.eclipse.jetty.io.Buffer;
33 import org.eclipse.jetty.io.Buffers;
34 import org.eclipse.jetty.io.ByteArrayBuffer;
35 import org.eclipse.jetty.io.Connection;
36 import org.eclipse.jetty.io.EndPoint;
37 import org.eclipse.jetty.io.View;
38 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
39 import org.eclipse.jetty.io.nio.SslSelectChannelEndPoint;
40 import org.eclipse.jetty.util.log.Log;
41 import org.eclipse.jetty.util.thread.Timeout;
42
43
44
45
46
47 public class HttpConnection implements Connection
48 {
49 private HttpDestination _destination;
50 private EndPoint _endp;
51 private HttpGenerator _generator;
52 private HttpParser _parser;
53 private boolean _http11 = true;
54 private int _status;
55 private Buffer _connectionHeader;
56 private Buffer _requestContentChunk;
57 private boolean _requestComplete;
58 private boolean _reserved;
59
60 private volatile HttpExchange _exchange;
61 private HttpExchange _pipeline;
62 private final Timeout.Task _timeout = new TimeoutTask();
63 private AtomicBoolean _idle = new AtomicBoolean(false);
64
65 public void dump() throws IOException
66 {
67 Log.info("endp=" + _endp + " " + _endp.isBufferingInput() + " " + _endp.isBufferingOutput());
68 Log.info("generator=" + _generator);
69 Log.info("parser=" + _parser.getState() + " " + _parser.isMoreInBuffer());
70 Log.info("exchange=" + _exchange);
71 if (_endp instanceof SslSelectChannelEndPoint)
72 ((SslSelectChannelEndPoint)_endp).dump();
73 }
74
75 HttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp)
76 {
77 _endp = endp;
78 _generator = new HttpGenerator(requestBuffers,endp);
79 _parser = new HttpParser(responseBuffers,endp,new Handler());
80 }
81
82 public long getTimeStamp()
83 {
84 return -1;
85 }
86
87 public void setReserved (boolean reserved)
88 {
89 _reserved = reserved;
90 }
91
92 public boolean isReserved()
93 {
94 return _reserved;
95 }
96
97 public HttpDestination getDestination()
98 {
99 return _destination;
100 }
101
102 public void setDestination(HttpDestination destination)
103 {
104 _destination = destination;
105 }
106
107 public boolean send(HttpExchange ex) throws IOException
108 {
109 synchronized (this)
110 {
111 if (_exchange != null)
112 {
113 if (_pipeline != null)
114 throw new IllegalStateException(this + " PIPELINED!!! _exchange=" + _exchange);
115 _pipeline = ex;
116 return true;
117 }
118
119 if (!_endp.isOpen())
120 return false;
121
122 _exchange = ex;
123 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_COMMIT);
124
125 if (_endp.isBlocking())
126 {
127 this.notify();
128 }
129 else
130 {
131 AsyncEndPoint scep = (AsyncEndPoint)_endp;
132 scep.scheduleWrite();
133 }
134 _destination.getHttpClient().schedule(_timeout);
135
136 return true;
137 }
138 }
139
140 public Connection handle() throws IOException
141 {
142 if (_exchange != null)
143 _exchange.associate(this);
144
145 try
146 {
147 int no_progress = 0;
148
149 boolean failed = false;
150 while (_endp.isBufferingInput() || _endp.isOpen())
151 {
152 synchronized (this)
153 {
154 while (_exchange == null)
155 {
156 if (_endp.isBlocking())
157 {
158 try
159 {
160 this.wait();
161 }
162 catch (InterruptedException e)
163 {
164 throw new InterruptedIOException();
165 }
166 }
167 else
168 {
169
170 _parser.fill();
171 _parser.skipCRLF();
172 if (_parser.isMoreInBuffer())
173 {
174 Log.warn("Unexpected data received but no request sent");
175 close();
176 }
177 return this;
178 }
179 }
180 if (!_exchange.isAssociated())
181 _exchange.associate(this);
182 }
183
184 if (_exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
185 {
186 no_progress = 0;
187 commitRequest();
188 }
189
190 try
191 {
192 long io = 0;
193 _endp.flush();
194
195 if (_generator.isComplete())
196 {
197 if (!_requestComplete)
198 {
199 _requestComplete = true;
200 _exchange.getEventListener().onRequestComplete();
201 }
202 }
203 else
204 {
205
206 synchronized (this)
207 {
208 if (_exchange == null)
209 continue;
210 }
211
212 long flushed = _generator.flushBuffer();
213 io += flushed;
214
215 if (!_generator.isComplete())
216 {
217 if (_exchange!=null)
218 {
219 InputStream in = _exchange.getRequestContentSource();
220 if (in != null)
221 {
222 if (_requestContentChunk == null || _requestContentChunk.length() == 0)
223 {
224 _requestContentChunk = _exchange.getRequestContentChunk();
225 _destination.getHttpClient().schedule(_timeout);
226
227 if (_requestContentChunk != null)
228 _generator.addContent(_requestContentChunk,false);
229 else
230 _generator.complete();
231
232 flushed = _generator.flushBuffer();
233 io += flushed;
234 }
235 }
236 else
237 _generator.complete();
238 }
239 else
240 _generator.complete();
241 }
242 }
243
244 if (_generator.isComplete() && !_requestComplete)
245 {
246 _requestComplete = true;
247 _exchange.getEventListener().onRequestComplete();
248 }
249
250
251 if (!_parser.isComplete() && (_generator.isComplete() || _generator.isCommitted() && !_endp.isBlocking()))
252 {
253 long filled = _parser.parseAvailable();
254 io += filled;
255 }
256
257 if (io > 0)
258 no_progress = 0;
259 else if (no_progress++ >= 2 && !_endp.isBlocking())
260 {
261
262 if (_endp instanceof SslSelectChannelEndPoint && !_generator.isComplete() && !_generator.isEmpty())
263 {
264 long flushed = _generator.flushBuffer();
265 if (flushed>0)
266 continue;
267 }
268 return this;
269 }
270 }
271 catch (Throwable e)
272 {
273 Log.debug("Failure on " + _exchange, e);
274
275 if (e instanceof ThreadDeath)
276 throw (ThreadDeath)e;
277
278 synchronized (this)
279 {
280 if (_exchange != null)
281 {
282
283
284 if (_exchange.getStatus() != HttpExchange.STATUS_CANCELLING)
285 {
286 _exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
287 _exchange.getEventListener().onException(e);
288 }
289 }
290 }
291
292 failed = true;
293 if (e instanceof IOException)
294 throw (IOException)e;
295
296 if (e instanceof Error)
297 throw (Error)e;
298
299 if (e instanceof RuntimeException)
300 throw (RuntimeException)e;
301
302 throw new RuntimeException(e);
303 }
304 finally
305 {
306 boolean complete = false;
307 boolean close = failed;
308 if (!failed)
309 {
310
311 if (_generator.isComplete())
312 {
313 if (!_requestComplete)
314 {
315 _requestComplete = true;
316 _exchange.getEventListener().onRequestComplete();
317 }
318
319
320
321 if (_parser.isComplete())
322 {
323 _destination.getHttpClient().cancel(_timeout);
324 complete = true;
325 }
326 }
327 }
328
329 if (complete || failed)
330 {
331 synchronized (this)
332 {
333 if (!close)
334 close = shouldClose();
335
336 reset(true);
337
338 no_progress = 0;
339 if (_exchange != null)
340 {
341 HttpExchange exchange=_exchange;
342 _exchange.disassociate();
343 _exchange = null;
344
345 if (_status==HttpStatus.SWITCHING_PROTOCOLS_101)
346 {
347 Connection switched=exchange.onSwitchProtocol(_endp);
348 if (switched!=null)
349 {
350
351 exchange = _pipeline;
352 _pipeline = null;
353 if (exchange!=null)
354 _destination.send(exchange);
355
356 return switched;
357 }
358 }
359
360 if (_pipeline == null)
361 {
362 if (!isReserved())
363 _destination.returnConnection(this, close);
364 }
365 else
366 {
367 if (close)
368 {
369 if (!isReserved())
370 _destination.returnConnection(this,close);
371
372 exchange = _pipeline;
373 _pipeline = null;
374 _destination.send(exchange);
375 }
376 else
377 {
378 exchange = _pipeline;
379 _pipeline = null;
380 send(exchange);
381 }
382 }
383
384 }
385 }
386 }
387 }
388 }
389 }
390 finally
391 {
392 if (_exchange != null && _exchange.isAssociated())
393 {
394 _exchange.disassociate();
395 }
396
397 if (!_generator.isComplete() && _generator.getBytesBuffered()>0 && _endp instanceof AsyncEndPoint)
398 {
399 ((AsyncEndPoint)_endp).setWritable(false);
400 }
401 }
402
403 return this;
404 }
405
406 public boolean isIdle()
407 {
408 synchronized (this)
409 {
410 return _exchange == null;
411 }
412 }
413
414
415
416
417 public boolean isSuspended()
418 {
419 return false;
420 }
421
422 public EndPoint getEndPoint()
423 {
424 return _endp;
425 }
426
427 private void commitRequest() throws IOException
428 {
429 synchronized (this)
430 {
431 _status=0;
432 if (_exchange.getStatus() != HttpExchange.STATUS_WAITING_FOR_COMMIT)
433 throw new IllegalStateException();
434
435 _exchange.setStatus(HttpExchange.STATUS_SENDING_REQUEST);
436 _generator.setVersion(_exchange.getVersion());
437
438 String uri = _exchange.getURI();
439 if (_destination.isProxied() && uri.startsWith("/"))
440 {
441
442 uri = (_destination.isSecure()?HttpSchemes.HTTPS:HttpSchemes.HTTP) + "://" + _destination.getAddress().getHost() + ":"
443 + _destination.getAddress().getPort() + uri;
444 Authentication auth = _destination.getProxyAuthentication();
445 if (auth != null)
446 auth.setCredentials(_exchange);
447 }
448
449 String method=_exchange.getMethod();
450 _generator.setRequest(method, uri);
451 _parser.setHeadResponse(HttpMethods.HEAD.equalsIgnoreCase(method));
452
453 HttpFields requestHeaders = _exchange.getRequestFields();
454 if (_exchange.getVersion() >= HttpVersions.HTTP_1_1_ORDINAL)
455 {
456 if (!requestHeaders.containsKey(HttpHeaders.HOST_BUFFER))
457 requestHeaders.add(HttpHeaders.HOST_BUFFER,_destination.getHostHeader());
458 }
459
460 Buffer requestContent = _exchange.getRequestContent();
461 if (requestContent != null)
462 {
463 requestHeaders.putLongField(HttpHeaders.CONTENT_LENGTH, requestContent.length());
464 _generator.completeHeader(requestHeaders,false);
465 _generator.addContent(new View(requestContent),true);
466 }
467 else
468 {
469 InputStream requestContentStream = _exchange.getRequestContentSource();
470 if (requestContentStream != null)
471 {
472 _generator.completeHeader(requestHeaders, false);
473 int available = requestContentStream.available();
474 if (available > 0)
475 {
476
477
478 byte[] buf = new byte[available];
479 int length = requestContentStream.read(buf);
480 _generator.addContent(new ByteArrayBuffer(buf, 0, length), false);
481 }
482 }
483 else
484 {
485 requestHeaders.remove(HttpHeaders.CONTENT_LENGTH);
486 _generator.completeHeader(requestHeaders, true);
487 }
488 }
489
490 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
491 }
492 }
493
494 protected void reset(boolean returnBuffers) throws IOException
495 {
496 _requestComplete = false;
497 _connectionHeader = null;
498 _parser.reset(returnBuffers);
499 _generator.reset(returnBuffers);
500 _http11 = true;
501 }
502
503 private boolean shouldClose()
504 {
505 if (_connectionHeader!=null)
506 {
507 if (HttpHeaderValues.CLOSE_BUFFER.equals(_connectionHeader))
508 return true;
509 if (HttpHeaderValues.KEEP_ALIVE_BUFFER.equals(_connectionHeader))
510 return false;
511 }
512 return !_http11;
513 }
514
515 private class Handler extends HttpParser.EventHandler
516 {
517 @Override
518 public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
519 {
520
521
522
523
524
525 }
526
527 @Override
528 public void startResponse(Buffer version, int status, Buffer reason) throws IOException
529 {
530 HttpExchange exchange = _exchange;
531 if (exchange!=null)
532 {
533 _http11 = HttpVersions.HTTP_1_1_BUFFER.equals(version);
534 _status=status;
535 exchange.getEventListener().onResponseStatus(version,status,reason);
536 exchange.setStatus(HttpExchange.STATUS_PARSING_HEADERS);
537 }
538 }
539
540 @Override
541 public void parsedHeader(Buffer name, Buffer value) throws IOException
542 {
543 HttpExchange exchange = _exchange;
544 if (exchange!=null)
545 {
546 if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.CONNECTION_ORDINAL)
547 {
548 _connectionHeader = HttpHeaderValues.CACHE.lookup(value);
549 }
550 exchange.getEventListener().onResponseHeader(name,value);
551 }
552 }
553
554 @Override
555 public void headerComplete() throws IOException
556 {
557 if (_endp instanceof AsyncEndPoint)
558 ((AsyncEndPoint)_endp).scheduleIdle();
559 HttpExchange exchange = _exchange;
560 if (exchange!=null)
561 exchange.setStatus(HttpExchange.STATUS_PARSING_CONTENT);
562 }
563
564 @Override
565 public void content(Buffer ref) throws IOException
566 {
567 if (_endp instanceof AsyncEndPoint)
568 ((AsyncEndPoint)_endp).scheduleIdle();
569 HttpExchange exchange = _exchange;
570 if (exchange!=null)
571 exchange.getEventListener().onResponseContent(ref);
572 }
573
574 @Override
575 public void messageComplete(long contextLength) throws IOException
576 {
577 HttpExchange exchange = _exchange;
578 if (exchange!=null)
579 exchange.setStatus(HttpExchange.STATUS_COMPLETED);
580 }
581 }
582
583 @Override
584 public String toString()
585 {
586 return "HttpConnection@" + hashCode() + "//" + _destination.getAddress().getHost() + ":" + _destination.getAddress().getPort();
587 }
588
589 public String toDetailString()
590 {
591 return toString() + " ex=" + _exchange + " " + _timeout.getAge();
592 }
593
594 public void close() throws IOException
595 {
596 _endp.close();
597 }
598
599 public void setIdleTimeout()
600 {
601 synchronized (this)
602 {
603 if (_idle.compareAndSet(false,true))
604 _destination.getHttpClient().scheduleIdle(_timeout);
605 else
606 throw new IllegalStateException();
607 }
608 }
609
610 public boolean cancelIdleTimeout()
611 {
612 synchronized (this)
613 {
614 if (_idle.compareAndSet(true,false))
615 {
616 _destination.getHttpClient().cancel(_timeout);
617 return true;
618 }
619 }
620
621 return false;
622 }
623
624 private class TimeoutTask extends Timeout.Task
625 {
626 @Override
627 public void expired()
628 {
629 HttpExchange ex = null;
630 try
631 {
632 synchronized (HttpConnection.this)
633 {
634 ex = _exchange;
635 _exchange = null;
636 if (ex != null)
637 {
638 ex.disassociate();
639 _destination.returnConnection(HttpConnection.this, true);
640 }
641 else if (_idle.compareAndSet(true,false))
642 {
643 _destination.returnIdleConnection(HttpConnection.this);
644 }
645 }
646 }
647 catch (Exception e)
648 {
649 Log.debug(e);
650 }
651 finally
652 {
653 try
654 {
655 close();
656 }
657 catch (IOException e)
658 {
659 Log.ignore(e);
660 }
661
662 if (ex != null && ex.getStatus() < HttpExchange.STATUS_COMPLETED)
663 {
664 ex.setStatus(HttpExchange.STATUS_EXPIRED);
665 }
666 }
667 }
668 }
669 }