1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.client;
15
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.io.InterruptedIOException;
19
20 import org.eclipse.jetty.client.security.Authorization;
21 import org.eclipse.jetty.http.HttpGenerator;
22 import org.eclipse.jetty.http.HttpHeaderValues;
23 import org.eclipse.jetty.http.HttpHeaders;
24 import org.eclipse.jetty.http.HttpParser;
25 import org.eclipse.jetty.http.HttpSchemes;
26 import org.eclipse.jetty.http.HttpVersions;
27 import org.eclipse.jetty.http.ssl.SslSelectChannelEndPoint;
28 import org.eclipse.jetty.io.Buffer;
29 import org.eclipse.jetty.io.Buffers;
30 import org.eclipse.jetty.io.ByteArrayBuffer;
31 import org.eclipse.jetty.io.Connection;
32 import org.eclipse.jetty.io.EndPoint;
33 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
34 import org.eclipse.jetty.util.log.Log;
35 import org.eclipse.jetty.util.thread.Timeout;
36
37
38
39
40
41
42
43 public class HttpConnection implements Connection
44 {
45 HttpDestination _destination;
46 EndPoint _endp;
47 HttpGenerator _generator;
48 HttpParser _parser;
49 boolean _http11 = true;
50 Buffer _connectionHeader;
51 Buffer _requestContentChunk;
52 long _last;
53 boolean _requestComplete;
54 public String _message;
55 public Throwable _throwable;
56 public boolean _reserved;
57
58
59 volatile HttpExchange _exchange;
60 HttpExchange _pipeline;
61
62 public void dump() throws IOException
63 {
64 Log.info("endp=" + _endp + " " + _endp.isBufferingInput() + " " + _endp.isBufferingOutput());
65 Log.info("generator=" + _generator);
66 Log.info("parser=" + _parser.getState() + " " + _parser.isMoreInBuffer());
67 Log.info("exchange=" + _exchange);
68 if (_endp instanceof SslSelectChannelEndPoint)
69 ((SslSelectChannelEndPoint)_endp).dump();
70 }
71
72 Timeout.Task _timeout = new Timeout.Task()
73 {
74 public void expired()
75 {
76 HttpExchange ex = null;
77 try
78 {
79 synchronized (HttpConnection.this)
80 {
81 ex = _exchange;
82 _exchange = null;
83 if (ex != null)
84 _destination.returnConnection(HttpConnection.this,true);
85 }
86 }
87 catch (Exception e)
88 {
89 Log.debug(e);
90 }
91 finally
92 {
93 try
94 {
95 _endp.close();
96 }
97 catch (IOException e)
98 {
99 Log.ignore(e);
100 }
101
102 if (ex!=null && ex.getStatus() < HttpExchange.STATUS_COMPLETED)
103 {
104 ex.setStatus(HttpExchange.STATUS_EXPIRED);
105 }
106 }
107 }
108 };
109
110
111 HttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp)
112 {
113 _endp = endp;
114 _generator = new HttpGenerator(requestBuffers,endp);
115 _parser = new HttpParser(responseBuffers,endp,new Handler());
116 }
117
118 public void setReserved (boolean reserved)
119 {
120 _reserved = reserved;
121 }
122
123 public boolean isReserved()
124 {
125 return _reserved;
126 }
127
128
129 public HttpDestination getDestination()
130 {
131 return _destination;
132 }
133
134
135 public void setDestination(HttpDestination destination)
136 {
137 _destination = destination;
138 }
139
140
141 public boolean send(HttpExchange ex) throws IOException
142 {
143
144
145
146 _throwable = new Throwable();
147 synchronized (this)
148 {
149 if (_exchange != null)
150 {
151 if (_pipeline != null)
152 throw new IllegalStateException(this + " PIPELINED!!! _exchange=" + _exchange);
153 _pipeline = ex;
154 return true;
155 }
156
157 if (!_endp.isOpen())
158 return false;
159
160 ex.setStatus(HttpExchange.STATUS_WAITING_FOR_COMMIT);
161 _exchange = ex;
162
163 if (_endp.isBlocking())
164 this.notify();
165 else
166 {
167 SelectChannelEndPoint scep = (SelectChannelEndPoint)_endp;
168 scep.scheduleWrite();
169 }
170
171 if (!_endp.isBlocking())
172 _destination.getHttpClient().schedule(_timeout);
173
174 return true;
175 }
176 }
177
178
179 public void handle() throws IOException
180 {
181 int no_progress = 0;
182 long flushed = 0;
183
184 boolean failed = false;
185 while (_endp.isBufferingInput() || _endp.isOpen())
186 {
187 synchronized (this)
188 {
189 while (_exchange == null)
190 {
191 if (_endp.isBlocking())
192 {
193 try
194 {
195 this.wait();
196 }
197 catch (InterruptedException e)
198 {
199 throw new InterruptedIOException();
200 }
201 }
202 else
203 {
204
205 _parser.fill();
206 _parser.skipCRLF();
207 if (_parser.isMoreInBuffer())
208 {
209 Log.warn("unexpected data");
210 _endp.close();
211 }
212
213 return;
214 }
215 }
216 }
217 if (_exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
218 {
219 no_progress = 0;
220 commitRequest();
221 }
222
223 try
224 {
225 long io = 0;
226 _endp.flush();
227
228 if (_generator.isComplete())
229 {
230 if (!_requestComplete)
231 {
232 _requestComplete = true;
233 _exchange.getEventListener().onRequestComplete();
234 }
235 }
236 else
237 {
238
239 synchronized (this)
240 {
241 if (_exchange == null)
242 continue;
243 flushed = _generator.flushBuffer();
244 io += flushed;
245 }
246
247 if (!_generator.isComplete())
248 {
249 InputStream in = _exchange.getRequestContentSource();
250 if (in != null)
251 {
252 if (_requestContentChunk == null || _requestContentChunk.length() == 0)
253 {
254 _requestContentChunk = _exchange.getRequestContentChunk();
255 if (_requestContentChunk != null)
256 _generator.addContent(_requestContentChunk,false);
257 else
258 _generator.complete();
259 io += _generator.flushBuffer();
260 }
261 }
262 else
263 _generator.complete();
264 }
265 }
266
267
268
269 if (!_parser.isComplete() && _generator.isCommitted())
270 {
271 long filled = _parser.parseAvailable();
272 io += filled;
273 }
274
275 if (io > 0)
276 no_progress = 0;
277 else if (no_progress++ >= 2 && !_endp.isBlocking())
278 {
279
280 if (_endp instanceof SslSelectChannelEndPoint && !_generator.isComplete() && !_generator.isEmpty())
281 {
282 if (_generator.flushBuffer()>0)
283 continue;
284 }
285 return;
286 }
287 }
288 catch (Throwable e)
289 {
290 if (e instanceof ThreadDeath)
291 throw (ThreadDeath)e;
292
293 synchronized (this)
294 {
295 if (_exchange != null)
296 {
297 _exchange.getEventListener().onException(e);
298 _exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
299 }
300 }
301 Log.warn("IOE on "+_exchange);
302 failed = true;
303 if (e instanceof IOException)
304 throw (IOException)e;
305
306 if (e instanceof Error)
307 throw (Error)e;
308
309 if (e instanceof RuntimeException)
310 throw (RuntimeException)e;
311
312 throw new RuntimeException(e);
313 }
314 finally
315 {
316 boolean complete = false;
317 boolean close = failed;
318 if (!failed)
319 {
320
321 if (_generator.isComplete())
322 {
323 if (!_requestComplete)
324 {
325 _requestComplete = true;
326 _exchange.getEventListener().onRequestComplete();
327 }
328
329
330
331 if (_parser.isComplete())
332 {
333 _destination.getHttpClient().cancel(_timeout);
334 complete = true;
335 }
336 }
337 }
338
339 if (complete || failed)
340 {
341 synchronized (this)
342 {
343 if (!close)
344 close = shouldClose();
345
346 reset(true);
347 no_progress = 0;
348 flushed = -1;
349 if (_exchange != null)
350 {
351 _exchange = null;
352
353 if (_pipeline == null)
354 {
355 if (!isReserved())
356 _destination.returnConnection(this,close);
357 if (close)
358 return;
359 }
360 else
361 {
362 if (close)
363 {
364 if (!isReserved())
365 _destination.returnConnection(this,close);
366 _destination.send(_pipeline);
367 _pipeline = null;
368 return;
369 }
370
371 HttpExchange ex = _pipeline;
372 _pipeline = null;
373
374 send(ex);
375 }
376 }
377 }
378 }
379 }
380 }
381 }
382
383
384 public boolean isIdle()
385 {
386 synchronized (this)
387 {
388 return _exchange == null;
389 }
390 }
391
392
393
394
395
396 public boolean isSuspended()
397 {
398 return false;
399 }
400
401
402 public EndPoint getEndPoint()
403 {
404 return _endp;
405 }
406
407
408 private void commitRequest() throws IOException
409 {
410 synchronized (this)
411 {
412 if (_exchange.getStatus() != HttpExchange.STATUS_WAITING_FOR_COMMIT)
413 throw new IllegalStateException();
414
415 _exchange.setStatus(HttpExchange.STATUS_SENDING_REQUEST);
416 _generator.setVersion(_exchange._version);
417
418 String uri = _exchange._uri;
419 if (_destination.isProxied() && uri.startsWith("/"))
420 {
421
422 uri = (_destination.isSecure()?HttpSchemes.HTTPS:HttpSchemes.HTTP) + "://" + _destination.getAddress().getHost() + ":"
423 + _destination.getAddress().getPort() + uri;
424 Authorization auth = _destination.getProxyAuthentication();
425 if (auth != null)
426 auth.setCredentials(_exchange);
427 }
428
429 _generator.setRequest(_exchange._method,uri);
430
431 if (_exchange._version >= HttpVersions.HTTP_1_1_ORDINAL)
432 {
433 if (!_exchange._requestFields.containsKey(HttpHeaders.HOST_BUFFER))
434 _exchange._requestFields.add(HttpHeaders.HOST_BUFFER,_destination.getHostHeader());
435 }
436
437 if (_exchange._requestContent != null)
438 {
439 _exchange._requestFields.putLongField(HttpHeaders.CONTENT_LENGTH,_exchange._requestContent.length());
440 _generator.completeHeader(_exchange._requestFields,false);
441 _generator.addContent(_exchange._requestContent,true);
442 }
443 else if (_exchange._requestContentSource != null)
444 {
445 _generator.completeHeader(_exchange._requestFields,false);
446 int available = _exchange._requestContentSource.available();
447 if (available > 0)
448 {
449
450
451
452 byte[] buf = new byte[available];
453 int length = _exchange._requestContentSource.read(buf);
454 _generator.addContent(new ByteArrayBuffer(buf,0,length),false);
455 }
456 }
457 else
458 {
459 _exchange._requestFields.remove(HttpHeaders.CONTENT_LENGTH);
460
461
462
463
464
465 _generator.completeHeader(_exchange._requestFields,true);
466 }
467
468 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
469 }
470 }
471
472
473 protected void reset(boolean returnBuffers) throws IOException
474 {
475 _requestComplete = false;
476 _connectionHeader = null;
477 _parser.reset(returnBuffers);
478 _generator.reset(returnBuffers);
479 _http11 = true;
480 }
481
482
483 private boolean shouldClose()
484 {
485 if (_connectionHeader!=null)
486 {
487 if (HttpHeaderValues.CLOSE_BUFFER.equals(_connectionHeader))
488 return true;
489 if (HttpHeaderValues.KEEP_ALIVE_BUFFER.equals(_connectionHeader))
490 return false;
491 }
492 return !_http11;
493 }
494
495
496 private class Handler extends HttpParser.EventHandler
497 {
498 @Override
499 public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
500 {
501
502
503
504
505
506 }
507
508 @Override
509 public void startResponse(Buffer version, int status, Buffer reason) throws IOException
510 {
511 HttpExchange exchange = _exchange;
512 if (exchange!=null)
513 {
514 _http11 = HttpVersions.HTTP_1_1_BUFFER.equals(version);
515 exchange.getEventListener().onResponseStatus(version,status,reason);
516 exchange.setStatus(HttpExchange.STATUS_PARSING_HEADERS);
517 }
518 }
519
520 @Override
521 public void parsedHeader(Buffer name, Buffer value) throws IOException
522 {
523 HttpExchange exchange = _exchange;
524 if (exchange!=null)
525 {
526 if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.CONNECTION_ORDINAL)
527 {
528 _connectionHeader = HttpHeaderValues.CACHE.lookup(value);
529 }
530 exchange.getEventListener().onResponseHeader(name,value);
531 }
532 }
533
534 @Override
535 public void headerComplete() throws IOException
536 {
537 HttpExchange exchange = _exchange;
538 if (exchange!=null)
539 exchange.setStatus(HttpExchange.STATUS_PARSING_CONTENT);
540 }
541
542 @Override
543 public void content(Buffer ref) throws IOException
544 {
545 HttpExchange exchange = _exchange;
546 if (exchange!=null)
547 exchange.getEventListener().onResponseContent(ref);
548 }
549
550 @Override
551 public void messageComplete(long contextLength) throws IOException
552 {
553 HttpExchange exchange = _exchange;
554 if (exchange!=null)
555 exchange.setStatus(HttpExchange.STATUS_COMPLETED);
556 }
557 }
558
559
560 public String toString()
561 {
562 return "HttpConnection@" + hashCode() + "//" + _destination.getAddress().getHost() + ":" + _destination.getAddress().getPort();
563 }
564
565
566 public String toDetailString()
567 {
568 return toString() + " ex=" + _exchange + " " + _timeout.getAge();
569 }
570
571
572
573
574
575 public long getLast()
576 {
577 return _last;
578 }
579
580
581
582
583
584
585 public void setLast(long last)
586 {
587 _last = last;
588 }
589
590
591 public void close() throws IOException
592 {
593 _endp.close();
594 }
595
596 }