1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.client;
15
16 import java.io.IOException;
17 import java.io.InputStream;
18 import java.util.Collections;
19 import java.util.concurrent.atomic.AtomicBoolean;
20
21 import org.eclipse.jetty.client.security.Authentication;
22 import org.eclipse.jetty.http.HttpFields;
23 import org.eclipse.jetty.http.HttpGenerator;
24 import org.eclipse.jetty.http.HttpHeaderValues;
25 import org.eclipse.jetty.http.HttpHeaders;
26 import org.eclipse.jetty.http.HttpMethods;
27 import org.eclipse.jetty.http.HttpParser;
28 import org.eclipse.jetty.http.HttpSchemes;
29 import org.eclipse.jetty.http.HttpStatus;
30 import org.eclipse.jetty.http.HttpVersions;
31 import org.eclipse.jetty.io.AbstractConnection;
32 import org.eclipse.jetty.io.Buffer;
33 import org.eclipse.jetty.io.Buffers;
34 import org.eclipse.jetty.io.ByteArrayBuffer;
35 import org.eclipse.jetty.io.Connection;
36 import org.eclipse.jetty.io.EndPoint;
37 import org.eclipse.jetty.io.EofException;
38 import org.eclipse.jetty.io.View;
39 import org.eclipse.jetty.util.component.AggregateLifeCycle;
40 import org.eclipse.jetty.util.component.Dumpable;
41 import org.eclipse.jetty.util.log.Log;
42 import org.eclipse.jetty.util.log.Logger;
43 import org.eclipse.jetty.util.thread.Timeout;
44
45
46
47
48
49 public abstract class AbstractHttpConnection extends AbstractConnection implements Dumpable
50 {
51 private static final Logger LOG = Log.getLogger(AbstractHttpConnection.class);
52
53 protected HttpDestination _destination;
54 protected HttpGenerator _generator;
55 protected HttpParser _parser;
56 protected boolean _http11 = true;
57 protected int _status;
58 protected Buffer _connectionHeader;
59 protected boolean _reserved;
60
61
62 protected volatile HttpExchange _exchange;
63 protected HttpExchange _pipeline;
64 private final Timeout.Task _idleTimeout = new ConnectionIdleTask();
65 private AtomicBoolean _idle = new AtomicBoolean(false);
66
67
68 AbstractHttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp)
69 {
70 super(endp);
71
72 _generator = new HttpGenerator(requestBuffers,endp);
73 _parser = new HttpParser(responseBuffers,endp,new Handler());
74 }
75
76 public void setReserved (boolean reserved)
77 {
78 _reserved = reserved;
79 }
80
81 public boolean isReserved()
82 {
83 return _reserved;
84 }
85
86 public HttpDestination getDestination()
87 {
88 return _destination;
89 }
90
91 public void setDestination(HttpDestination destination)
92 {
93 _destination = destination;
94 }
95
96 public boolean send(HttpExchange ex) throws IOException
97 {
98 LOG.debug("Send {} on {}",ex,this);
99 synchronized (this)
100 {
101 if (_exchange != null)
102 {
103 if (_pipeline != null)
104 throw new IllegalStateException(this + " PIPELINED!!! _exchange=" + _exchange);
105 _pipeline = ex;
106 return true;
107 }
108
109 _exchange = ex;
110 _exchange.associate(this);
111
112
113 if (!_endp.isOpen())
114 {
115 _exchange.disassociate();
116 _exchange = null;
117 return false;
118 }
119
120 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_COMMIT);
121
122 adjustIdleTimeout();
123
124 return true;
125 }
126 }
127
128 private void adjustIdleTimeout() throws IOException
129 {
130
131
132
133
134
135 long timeout = _exchange.getTimeout();
136 if (timeout <= 0)
137 timeout = _destination.getHttpClient().getTimeout();
138
139 long endPointTimeout = _endp.getMaxIdleTime();
140
141 if (timeout > 0 && timeout > endPointTimeout)
142 {
143
144
145
146 _endp.setMaxIdleTime(2 * (int)timeout);
147 }
148 }
149
150 public abstract Connection handle() throws IOException;
151
152
153 public boolean isIdle()
154 {
155 synchronized (this)
156 {
157 return _exchange == null;
158 }
159 }
160
161 public boolean isSuspended()
162 {
163 return false;
164 }
165
166 public void onClose()
167 {
168 }
169
170 protected void commitRequest() throws IOException
171 {
172 synchronized (this)
173 {
174 _status=0;
175 if (_exchange.getStatus() != HttpExchange.STATUS_WAITING_FOR_COMMIT)
176 throw new IllegalStateException();
177
178 _exchange.setStatus(HttpExchange.STATUS_SENDING_REQUEST);
179 _generator.setVersion(_exchange.getVersion());
180
181 String method=_exchange.getMethod();
182 String uri = _exchange.getRequestURI();
183 if (_destination.isProxied() && !HttpMethods.CONNECT.equals(method) && uri.startsWith("/"))
184 {
185 boolean secure = _destination.isSecure();
186 String host = _destination.getAddress().getHost();
187 int port = _destination.getAddress().getPort();
188 StringBuilder absoluteURI = new StringBuilder();
189 absoluteURI.append(secure ? HttpSchemes.HTTPS : HttpSchemes.HTTP);
190 absoluteURI.append("://");
191 absoluteURI.append(host);
192
193 if (!(secure && port == 443 || !secure && port == 80))
194 absoluteURI.append(":").append(port);
195 absoluteURI.append(uri);
196 uri = absoluteURI.toString();
197 Authentication auth = _destination.getProxyAuthentication();
198 if (auth != null)
199 auth.setCredentials(_exchange);
200 }
201
202 _generator.setRequest(method, uri);
203 _parser.setHeadResponse(HttpMethods.HEAD.equalsIgnoreCase(method));
204
205 HttpFields requestHeaders = _exchange.getRequestFields();
206 if (_exchange.getVersion() >= HttpVersions.HTTP_1_1_ORDINAL)
207 {
208 if (!requestHeaders.containsKey(HttpHeaders.HOST_BUFFER))
209 requestHeaders.add(HttpHeaders.HOST_BUFFER,_destination.getHostHeader());
210 }
211
212 Buffer requestContent = _exchange.getRequestContent();
213 if (requestContent != null)
214 {
215 requestHeaders.putLongField(HttpHeaders.CONTENT_LENGTH, requestContent.length());
216 _generator.completeHeader(requestHeaders,false);
217 _generator.addContent(new View(requestContent),true);
218 }
219 else
220 {
221 InputStream requestContentStream = _exchange.getRequestContentSource();
222 if (requestContentStream != null)
223 {
224 _generator.completeHeader(requestHeaders, false);
225 int available = requestContentStream.available();
226 if (available > 0)
227 {
228
229
230 byte[] buf = new byte[available];
231 int length = requestContentStream.read(buf);
232 _generator.addContent(new ByteArrayBuffer(buf, 0, length), false);
233 }
234 }
235 else
236 {
237 requestHeaders.remove(HttpHeaders.CONTENT_LENGTH);
238 _generator.completeHeader(requestHeaders, true);
239 }
240 }
241
242 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
243 }
244 }
245
246 protected void reset() throws IOException
247 {
248 _connectionHeader = null;
249 _parser.reset();
250 _generator.reset();
251 _http11 = true;
252 }
253
254
255 private class Handler extends HttpParser.EventHandler
256 {
257 @Override
258 public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
259 {
260
261
262
263
264
265 }
266
267 @Override
268 public void startResponse(Buffer version, int status, Buffer reason) throws IOException
269 {
270 HttpExchange exchange = _exchange;
271 if (exchange==null)
272 {
273 LOG.warn("No exchange for response");
274 _endp.close();
275 return;
276 }
277
278 switch(status)
279 {
280 case HttpStatus.CONTINUE_100:
281 case HttpStatus.PROCESSING_102:
282
283 exchange.setEventListener(new NonFinalResponseListener(exchange));
284 break;
285
286 case HttpStatus.OK_200:
287
288 if (HttpMethods.CONNECT.equalsIgnoreCase(exchange.getMethod()))
289 _parser.setHeadResponse(true);
290 break;
291 }
292
293 _http11 = HttpVersions.HTTP_1_1_BUFFER.equals(version);
294 _status=status;
295 exchange.getEventListener().onResponseStatus(version,status,reason);
296 exchange.setStatus(HttpExchange.STATUS_PARSING_HEADERS);
297
298 }
299
300 @Override
301 public void parsedHeader(Buffer name, Buffer value) throws IOException
302 {
303 HttpExchange exchange = _exchange;
304 if (exchange!=null)
305 {
306 if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.CONNECTION_ORDINAL)
307 {
308 _connectionHeader = HttpHeaderValues.CACHE.lookup(value);
309 }
310 exchange.getEventListener().onResponseHeader(name,value);
311 }
312 }
313
314 @Override
315 public void headerComplete() throws IOException
316 {
317 HttpExchange exchange = _exchange;
318 if (exchange!=null)
319 exchange.setStatus(HttpExchange.STATUS_PARSING_CONTENT);
320 }
321
322 @Override
323 public void content(Buffer ref) throws IOException
324 {
325 HttpExchange exchange = _exchange;
326 if (exchange!=null)
327 exchange.getEventListener().onResponseContent(ref);
328 }
329
330 @Override
331 public void messageComplete(long contextLength) throws IOException
332 {
333 HttpExchange exchange = _exchange;
334 if (exchange!=null)
335 exchange.setStatus(HttpExchange.STATUS_COMPLETED);
336 }
337
338 @Override
339 public void earlyEOF()
340 {
341 HttpExchange exchange = _exchange;
342 if (exchange!=null)
343 {
344 if (!exchange.isDone())
345 {
346 if (exchange.setStatus(HttpExchange.STATUS_EXCEPTED))
347 exchange.getEventListener().onException(new EofException("early EOF"));
348 }
349 }
350 }
351
352
353 }
354
355 @Override
356 public String toString()
357 {
358 return String.format("%s %s g=%s p=%s",
359 super.toString(),
360 _destination == null ? "?.?.?.?:??" : _destination.getAddress(),
361 _generator,
362 _parser);
363 }
364
365 public String toDetailString()
366 {
367 return toString() + " ex=" + _exchange + " idle for " + _idleTimeout.getAge();
368 }
369
370 public void close() throws IOException
371 {
372
373
374
375 HttpExchange exchange = _exchange;
376 if (exchange != null && !exchange.isDone())
377 {
378 switch (exchange.getStatus())
379 {
380 case HttpExchange.STATUS_CANCELLED:
381 case HttpExchange.STATUS_CANCELLING:
382 case HttpExchange.STATUS_COMPLETED:
383 case HttpExchange.STATUS_EXCEPTED:
384 case HttpExchange.STATUS_EXPIRED:
385 break;
386 case HttpExchange.STATUS_PARSING_CONTENT:
387 if (_endp.isInputShutdown() && _parser.isState(HttpParser.STATE_EOF_CONTENT))
388 break;
389 default:
390 String exch= exchange.toString();
391 String reason = _endp.isOpen()?(_endp.isInputShutdown()?"half closed: ":"local close: "):"closed: ";
392 if (exchange.setStatus(HttpExchange.STATUS_EXCEPTED))
393 exchange.getEventListener().onException(new EofException(reason+exch));
394 }
395 }
396
397 if (_endp.isOpen())
398 {
399 _endp.close();
400 _destination.returnConnection(this, true);
401 }
402 }
403
404 public void setIdleTimeout()
405 {
406 synchronized (this)
407 {
408 if (_idle.compareAndSet(false, true))
409 _destination.getHttpClient().scheduleIdle(_idleTimeout);
410 else
411 throw new IllegalStateException();
412 }
413 }
414
415 public boolean cancelIdleTimeout()
416 {
417 synchronized (this)
418 {
419 if (_idle.compareAndSet(true, false))
420 {
421 _destination.getHttpClient().cancel(_idleTimeout);
422 return true;
423 }
424 }
425
426 return false;
427 }
428
429 protected void exchangeExpired(HttpExchange exchange)
430 {
431 synchronized (this)
432 {
433
434
435 if (_exchange == exchange)
436 {
437 try
438 {
439 _destination.returnConnection(this, true);
440 }
441 catch (IOException x)
442 {
443 LOG.ignore(x);
444 }
445 }
446 }
447 }
448
449
450
451
452
453 public String dump()
454 {
455 return AggregateLifeCycle.dump(this);
456 }
457
458
459
460
461
462 public void dump(Appendable out, String indent) throws IOException
463 {
464 synchronized (this)
465 {
466 out.append(String.valueOf(this)).append("\n");
467 AggregateLifeCycle.dump(out,indent,Collections.singletonList(_endp));
468 }
469 }
470
471
472 private class ConnectionIdleTask extends Timeout.Task
473 {
474
475 @Override
476 public void expired()
477 {
478
479 if (_idle.compareAndSet(true, false))
480 {
481 _destination.returnIdleConnection(AbstractHttpConnection.this);
482 }
483 }
484 }
485
486
487
488 private class NonFinalResponseListener implements HttpEventListener
489 {
490 final HttpExchange _exchange;
491 final HttpEventListener _next;
492
493
494 public NonFinalResponseListener(HttpExchange exchange)
495 {
496 _exchange=exchange;
497 _next=exchange.getEventListener();
498 }
499
500
501 public void onRequestCommitted() throws IOException
502 {
503 }
504
505
506 public void onRequestComplete() throws IOException
507 {
508 }
509
510
511 public void onResponseStatus(Buffer version, int status, Buffer reason) throws IOException
512 {
513 }
514
515
516 public void onResponseHeader(Buffer name, Buffer value) throws IOException
517 {
518 _next.onResponseHeader(name,value);
519 }
520
521
522 public void onResponseHeaderComplete() throws IOException
523 {
524 _next.onResponseHeaderComplete();
525 }
526
527
528 public void onResponseContent(Buffer content) throws IOException
529 {
530 }
531
532
533 public void onResponseComplete() throws IOException
534 {
535 _exchange.setEventListener(_next);
536 _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
537 _parser.reset();
538 }
539
540
541 public void onConnectionFailed(Throwable ex)
542 {
543 _exchange.setEventListener(_next);
544 _next.onConnectionFailed(ex);
545 }
546
547
548 public void onException(Throwable ex)
549 {
550 _exchange.setEventListener(_next);
551 _next.onException(ex);
552 }
553
554
555 public void onExpire()
556 {
557 _exchange.setEventListener(_next);
558 _next.onExpire();
559 }
560
561
562 public void onRetry()
563 {
564 _exchange.setEventListener(_next);
565 _next.onRetry();
566 }
567 }
568 }