1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.client;
15
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.util.Collections;
19 import java.util.concurrent.atomic.AtomicBoolean;
20
21 import org.eclipse.jetty.client.security.Authentication;
22 import org.eclipse.jetty.http.HttpFields;
23 import org.eclipse.jetty.http.HttpGenerator;
24 import org.eclipse.jetty.http.HttpHeaderValues;
25 import org.eclipse.jetty.http.HttpHeaders;
26 import org.eclipse.jetty.http.HttpMethods;
27 import org.eclipse.jetty.http.HttpParser;
28 import org.eclipse.jetty.http.HttpSchemes;
29 import org.eclipse.jetty.http.HttpStatus;
30 import org.eclipse.jetty.http.HttpVersions;
31 import org.eclipse.jetty.io.AbstractConnection;
32 import org.eclipse.jetty.io.Buffer;
33 import org.eclipse.jetty.io.Buffers;
34 import org.eclipse.jetty.io.ByteArrayBuffer;
35 import org.eclipse.jetty.io.Connection;
36 import org.eclipse.jetty.io.EndPoint;
37 import org.eclipse.jetty.io.EofException;
38 import org.eclipse.jetty.io.View;
39 import org.eclipse.jetty.util.component.AggregateLifeCycle;
40 import org.eclipse.jetty.util.component.Dumpable;
41 import org.eclipse.jetty.util.log.Log;
42 import org.eclipse.jetty.util.log.Logger;
43 import org.eclipse.jetty.util.thread.Timeout;
44
45
46
47
48
49 public abstract class AbstractHttpConnection extends AbstractConnection implements Dumpable
50 {
51 private static final Logger LOG = Log.getLogger(AbstractHttpConnection.class);
52
53 protected HttpDestination _destination;
54 protected HttpGenerator _generator;
55 protected HttpParser _parser;
56 protected boolean _http11 = true;
57 protected int _status;
58 protected Buffer _connectionHeader;
59 protected boolean _reserved;
60
61
62 protected volatile HttpExchange _exchange;
63 protected HttpExchange _pipeline;
64 private final Timeout.Task _idleTimeout = new ConnectionIdleTask();
65 private AtomicBoolean _idle = new AtomicBoolean(false);
66
67
68 AbstractHttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp)
69 {
70 super(endp);
71
72 _generator = new HttpGenerator(requestBuffers,endp);
73 _parser = new HttpParser(responseBuffers,endp,new Handler());
74 }
75
76 public void setReserved (boolean reserved)
77 {
78 _reserved = reserved;
79 }
80
81 public boolean isReserved()
82 {
83 return _reserved;
84 }
85
86 public HttpDestination getDestination()
87 {
88 return _destination;
89 }
90
91 public void setDestination(HttpDestination destination)
92 {
93 _destination = destination;
94 }
95
96 public boolean send(HttpExchange ex) throws IOException
97 {
98 LOG.debug("Send {} on {}",ex,this);
99 synchronized (this)
100 {
101 if (_exchange != null)
102 {
103 if (_pipeline != null)
104 throw new IllegalStateException(this + " PIPELINED!!! _exchange=" + _exchange);
105 _pipeline = ex;
106 return true;
107 }
108
109 _exchange = ex;
110 _exchange.associate(this);
111
112
113 if (!_endp.isOpen())
114 {
115 _exchange.disassociate();
116 _exchange = null;
117 return false;
118 }
119
120 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_COMMIT);
121
122 adjustIdleTimeout();
123
124 return true;
125 }
126 }
127
128 private void adjustIdleTimeout() throws IOException
129 {
130
131
132
133
134
135 long timeout = _exchange.getTimeout();
136 if (timeout <= 0)
137 timeout = _destination.getHttpClient().getTimeout();
138
139 long endPointTimeout = _endp.getMaxIdleTime();
140
141 if (timeout > 0 && timeout > endPointTimeout)
142 {
143
144
145
146 _endp.setMaxIdleTime(2 * (int)timeout);
147 }
148 }
149
150 public abstract Connection handle() throws IOException;
151
152
153 public boolean isIdle()
154 {
155 synchronized (this)
156 {
157 return _exchange == null;
158 }
159 }
160
161 public boolean isSuspended()
162 {
163 return false;
164 }
165
166 public void onClose()
167 {
168 }
169
170 protected void commitRequest() throws IOException
171 {
172 synchronized (this)
173 {
174 _status=0;
175 if (_exchange.getStatus() != HttpExchange.STATUS_WAITING_FOR_COMMIT)
176 throw new IllegalStateException();
177
178 _exchange.setStatus(HttpExchange.STATUS_SENDING_REQUEST);
179 _generator.setVersion(_exchange.getVersion());
180
181 String method=_exchange.getMethod();
182 String uri = _exchange.getRequestURI();
183 if (_destination.isProxied())
184 {
185 if (!HttpMethods.CONNECT.equals(method) && uri.startsWith("/"))
186 {
187 boolean secure = _destination.isSecure();
188 String host = _destination.getAddress().getHost();
189 int port = _destination.getAddress().getPort();
190 StringBuilder absoluteURI = new StringBuilder();
191 absoluteURI.append(secure ? HttpSchemes.HTTPS : HttpSchemes.HTTP);
192 absoluteURI.append("://");
193 absoluteURI.append(host);
194
195 if (!(secure && port == 443 || !secure && port == 80))
196 absoluteURI.append(":").append(port);
197 absoluteURI.append(uri);
198 uri = absoluteURI.toString();
199 }
200 Authentication auth = _destination.getProxyAuthentication();
201 if (auth != null)
202 auth.setCredentials(_exchange);
203 }
204
205 _generator.setRequest(method, uri);
206 _parser.setHeadResponse(HttpMethods.HEAD.equalsIgnoreCase(method));
207
208 HttpFields requestHeaders = _exchange.getRequestFields();
209 if (_exchange.getVersion() >= HttpVersions.HTTP_1_1_ORDINAL)
210 {
211 if (!requestHeaders.containsKey(HttpHeaders.HOST_BUFFER))
212 requestHeaders.add(HttpHeaders.HOST_BUFFER,_destination.getHostHeader());
213 }
214
215 Buffer requestContent = _exchange.getRequestContent();
216 if (requestContent != null)
217 {
218 requestHeaders.putLongField(HttpHeaders.CONTENT_LENGTH, requestContent.length());
219 _generator.completeHeader(requestHeaders,false);
220 _generator.addContent(new View(requestContent),true);
221 }
222 else
223 {
224 InputStream requestContentStream = _exchange.getRequestContentSource();
225 if (requestContentStream != null)
226 {
227 _generator.completeHeader(requestHeaders, false);
228 int available = requestContentStream.available();
229 if (available > 0)
230 {
231
232
233 byte[] buf = new byte[available];
234 int length = requestContentStream.read(buf);
235 _generator.addContent(new ByteArrayBuffer(buf, 0, length), false);
236 }
237 }
238 else
239 {
240 requestHeaders.remove(HttpHeaders.CONTENT_LENGTH);
241 _generator.completeHeader(requestHeaders, true);
242 }
243 }
244
245 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
246 }
247 }
248
249 protected void reset() throws IOException
250 {
251 _connectionHeader = null;
252 _parser.reset();
253 _generator.reset();
254 _http11 = true;
255 }
256
257
258 private class Handler extends HttpParser.EventHandler
259 {
260 @Override
261 public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
262 {
263
264
265
266
267
268 }
269
270 @Override
271 public void startResponse(Buffer version, int status, Buffer reason) throws IOException
272 {
273 HttpExchange exchange = _exchange;
274 if (exchange==null)
275 {
276 LOG.warn("No exchange for response");
277 _endp.close();
278 return;
279 }
280
281 switch(status)
282 {
283 case HttpStatus.CONTINUE_100:
284 case HttpStatus.PROCESSING_102:
285
286 exchange.setEventListener(new NonFinalResponseListener(exchange));
287 break;
288
289 case HttpStatus.OK_200:
290
291 if (HttpMethods.CONNECT.equalsIgnoreCase(exchange.getMethod()))
292 _parser.setHeadResponse(true);
293 break;
294 }
295
296 _http11 = HttpVersions.HTTP_1_1_BUFFER.equals(version);
297 _status=status;
298 exchange.getEventListener().onResponseStatus(version,status,reason);
299 exchange.setStatus(HttpExchange.STATUS_PARSING_HEADERS);
300
301 }
302
303 @Override
304 public void parsedHeader(Buffer name, Buffer value) throws IOException
305 {
306 HttpExchange exchange = _exchange;
307 if (exchange!=null)
308 {
309 if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.CONNECTION_ORDINAL)
310 {
311 _connectionHeader = HttpHeaderValues.CACHE.lookup(value);
312 }
313 exchange.getEventListener().onResponseHeader(name,value);
314 }
315 }
316
317 @Override
318 public void headerComplete() throws IOException
319 {
320 HttpExchange exchange = _exchange;
321 if (exchange!=null)
322 exchange.setStatus(HttpExchange.STATUS_PARSING_CONTENT);
323 }
324
325 @Override
326 public void content(Buffer ref) throws IOException
327 {
328 HttpExchange exchange = _exchange;
329 if (exchange!=null)
330 exchange.getEventListener().onResponseContent(ref);
331 }
332
333 @Override
334 public void messageComplete(long contextLength) throws IOException
335 {
336 HttpExchange exchange = _exchange;
337 if (exchange!=null)
338 exchange.setStatus(HttpExchange.STATUS_COMPLETED);
339 }
340
341 @Override
342 public void earlyEOF()
343 {
344 HttpExchange exchange = _exchange;
345 if (exchange!=null)
346 {
347 if (!exchange.isDone())
348 {
349 if (exchange.setStatus(HttpExchange.STATUS_EXCEPTED))
350 exchange.getEventListener().onException(new EofException("early EOF"));
351 }
352 }
353 }
354
355
356 }
357
358 @Override
359 public String toString()
360 {
361 return String.format("%s %s g=%s p=%s",
362 super.toString(),
363 _destination == null ? "?.?.?.?:??" : _destination.getAddress(),
364 _generator,
365 _parser);
366 }
367
368 public String toDetailString()
369 {
370 return toString() + " ex=" + _exchange + " idle for " + _idleTimeout.getAge();
371 }
372
373 public void close() throws IOException
374 {
375
376
377
378 HttpExchange exchange = _exchange;
379 if (exchange != null && !exchange.isDone())
380 {
381 switch (exchange.getStatus())
382 {
383 case HttpExchange.STATUS_CANCELLED:
384 case HttpExchange.STATUS_CANCELLING:
385 case HttpExchange.STATUS_COMPLETED:
386 case HttpExchange.STATUS_EXCEPTED:
387 case HttpExchange.STATUS_EXPIRED:
388 break;
389 case HttpExchange.STATUS_PARSING_CONTENT:
390 if (_endp.isInputShutdown() && _parser.isState(HttpParser.STATE_EOF_CONTENT))
391 break;
392 default:
393 String exch= exchange.toString();
394 String reason = _endp.isOpen()?(_endp.isInputShutdown()?"half closed: ":"local close: "):"closed: ";
395 if (exchange.setStatus(HttpExchange.STATUS_EXCEPTED))
396 exchange.getEventListener().onException(new EofException(reason+exch));
397 }
398 }
399
400 if (_endp.isOpen())
401 {
402 _endp.close();
403 _destination.returnConnection(this, true);
404 }
405 }
406
407 public void setIdleTimeout()
408 {
409 synchronized (this)
410 {
411 if (_idle.compareAndSet(false, true))
412 _destination.getHttpClient().scheduleIdle(_idleTimeout);
413 else
414 throw new IllegalStateException();
415 }
416 }
417
418 public boolean cancelIdleTimeout()
419 {
420 synchronized (this)
421 {
422 if (_idle.compareAndSet(true, false))
423 {
424 _destination.getHttpClient().cancel(_idleTimeout);
425 return true;
426 }
427 }
428
429 return false;
430 }
431
432 protected void exchangeExpired(HttpExchange exchange)
433 {
434 synchronized (this)
435 {
436
437
438 if (_exchange == exchange)
439 {
440 try
441 {
442 _destination.returnConnection(this, true);
443 }
444 catch (IOException x)
445 {
446 LOG.ignore(x);
447 }
448 }
449 }
450 }
451
452
453
454
455
456 public String dump()
457 {
458 return AggregateLifeCycle.dump(this);
459 }
460
461
462
463
464
465 public void dump(Appendable out, String indent) throws IOException
466 {
467 synchronized (this)
468 {
469 out.append(String.valueOf(this)).append("\n");
470 AggregateLifeCycle.dump(out,indent,Collections.singletonList(_endp));
471 }
472 }
473
474
475 private class ConnectionIdleTask extends Timeout.Task
476 {
477
478 @Override
479 public void expired()
480 {
481
482 if (_idle.compareAndSet(true, false))
483 {
484 _destination.returnIdleConnection(AbstractHttpConnection.this);
485 }
486 }
487 }
488
489
490
491 private class NonFinalResponseListener implements HttpEventListener
492 {
493 final HttpExchange _exchange;
494 final HttpEventListener _next;
495
496
497 public NonFinalResponseListener(HttpExchange exchange)
498 {
499 _exchange=exchange;
500 _next=exchange.getEventListener();
501 }
502
503
504 public void onRequestCommitted() throws IOException
505 {
506 }
507
508
509 public void onRequestComplete() throws IOException
510 {
511 }
512
513
514 public void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException
515 {
516 }
517
518
519 public void onResponseHeader(Buffer name, Buffer value) throws IOException
520 {
521 _next.onResponseHeader(name,value);
522 }
523
524
525 public void onResponseHeaderComplete() throws IOException
526 {
527 _next.onResponseHeaderComplete();
528 }
529
530
531 public void onResponseContent(Buffer content) throws IOException
532 {
533 }
534
535
536 public void onResponseComplete() throws IOException
537 {
538 _exchange.setEventListener(_next);
539 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
540 _parser.reset();
541 }
542
543
544 public void onConnectionFailed(Throwable ex)
545 {
546 _exchange.setEventListener(_next);
547 _next.onConnectionFailed(ex);
548 }
549
550
551 public void onException(Throwable ex)
552 {
553 _exchange.setEventListener(_next);
554 _next.onException(ex);
555 }
556
557
558 public void onExpire()
559 {
560 _exchange.setEventListener(_next);
561 _next.onExpire();
562 }
563
564
565 public void onRetry()
566 {
567 _exchange.setEventListener(_next);
568 _next.onRetry();
569 }
570 }
571 }