1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.nio.ByteBuffer;
22 import java.nio.channels.ByteChannel;
23 import java.util.concurrent.RejectedExecutionException;
24
25 import org.eclipse.jetty.http.HttpGenerator;
26 import org.eclipse.jetty.http.HttpGenerator.ResponseInfo;
27 import org.eclipse.jetty.http.HttpHeader;
28 import org.eclipse.jetty.http.HttpHeaderValue;
29 import org.eclipse.jetty.http.HttpMethod;
30 import org.eclipse.jetty.http.HttpParser;
31 import org.eclipse.jetty.http.HttpStatus;
32 import org.eclipse.jetty.http.HttpVersion;
33 import org.eclipse.jetty.io.AbstractConnection;
34 import org.eclipse.jetty.io.ByteBufferPool;
35 import org.eclipse.jetty.io.Connection;
36 import org.eclipse.jetty.io.EndPoint;
37 import org.eclipse.jetty.io.EofException;
38 import org.eclipse.jetty.util.BufferUtil;
39 import org.eclipse.jetty.util.Callback;
40 import org.eclipse.jetty.util.IteratingNestedCallback;
41 import org.eclipse.jetty.util.log.Log;
42 import org.eclipse.jetty.util.log.Logger;
43
44
45
46
47 public class HttpConnection extends AbstractConnection implements Runnable, HttpTransport
48 {
49 public static final String UPGRADE_CONNECTION_ATTRIBUTE = "org.eclipse.jetty.server.HttpConnection.UPGRADE";
50 private static final boolean REQUEST_BUFFER_DIRECT=false;
51 private static final boolean HEADER_BUFFER_DIRECT=false;
52 private static final boolean CHUNK_BUFFER_DIRECT=false;
53 private static final Logger LOG = Log.getLogger(HttpConnection.class);
54 private static final ThreadLocal<HttpConnection> __currentConnection = new ThreadLocal<>();
55
56 private final HttpConfiguration _config;
57 private final Connector _connector;
58 private final ByteBufferPool _bufferPool;
59 private final HttpGenerator _generator;
60 private final HttpChannelOverHttp _channel;
61 private final HttpParser _parser;
62 private volatile ByteBuffer _requestBuffer = null;
63 private volatile ByteBuffer _chunk = null;
64
65
66 public static HttpConnection getCurrentConnection()
67 {
68 return __currentConnection.get();
69 }
70
71 protected static void setCurrentConnection(HttpConnection connection)
72 {
73 __currentConnection.set(connection);
74 }
75
76 public HttpConfiguration getHttpConfiguration()
77 {
78 return _config;
79 }
80
81 public HttpConnection(HttpConfiguration config, Connector connector, EndPoint endPoint)
82 {
83
84
85 super(endPoint, connector.getExecutor(),true);
86
87 _config = config;
88 _connector = connector;
89 _bufferPool = _connector.getByteBufferPool();
90 _generator = newHttpGenerator();
91 HttpInput<ByteBuffer> input = newHttpInput();
92 _channel = newHttpChannel(input);
93 _parser = newHttpParser();
94 LOG.debug("New HTTP Connection {}", this);
95 }
96
97 protected HttpGenerator newHttpGenerator()
98 {
99 return new HttpGenerator(_config.getSendServerVersion(),_config.getSendXPoweredBy());
100 }
101
102 protected HttpInput<ByteBuffer> newHttpInput()
103 {
104 return new HttpInputOverHTTP(this);
105 }
106
107 protected HttpChannelOverHttp newHttpChannel(HttpInput<ByteBuffer> httpInput)
108 {
109 return new HttpChannelOverHttp(_connector, _config, getEndPoint(), this, httpInput);
110 }
111
112 protected HttpParser newHttpParser()
113 {
114 return new HttpParser(newRequestHandler(), getHttpConfiguration().getRequestHeaderSize());
115 }
116
117 protected HttpParser.RequestHandler<ByteBuffer> newRequestHandler()
118 {
119 return _channel;
120 }
121
122 public Server getServer()
123 {
124 return _connector.getServer();
125 }
126
127 public Connector getConnector()
128 {
129 return _connector;
130 }
131
132 public HttpChannel<?> getHttpChannel()
133 {
134 return _channel;
135 }
136
137 public HttpParser getParser()
138 {
139 return _parser;
140 }
141
142 @Override
143 public int getMessagesIn()
144 {
145 return getHttpChannel().getRequests();
146 }
147
148 @Override
149 public int getMessagesOut()
150 {
151 return getHttpChannel().getRequests();
152 }
153
154 void releaseRequestBuffer()
155 {
156 if (_requestBuffer != null && !_requestBuffer.hasRemaining())
157 {
158 ByteBuffer buffer=_requestBuffer;
159 _requestBuffer=null;
160 _bufferPool.release(buffer);
161 }
162 }
163
164 public ByteBuffer getRequestBuffer()
165 {
166 if (_requestBuffer == null)
167 _requestBuffer = _bufferPool.acquire(getInputBufferSize(), REQUEST_BUFFER_DIRECT);
168 return _requestBuffer;
169 }
170
171
172
173
174
175
176
177
178
179 @Override
180 public void onFillable()
181 {
182 LOG.debug("{} onFillable {}", this, _channel.getState());
183
184 setCurrentConnection(this);
185 int filled=Integer.MAX_VALUE;
186 boolean suspended=false;
187 try
188 {
189
190 while (!suspended && getEndPoint().getConnection()==this)
191 {
192
193 if (BufferUtil.isEmpty(_requestBuffer))
194 {
195
196 if (filled<=0)
197 break;
198
199
200 if(getEndPoint().isInputShutdown())
201 {
202
203 filled=-1;
204 _parser.atEOF();
205 }
206 else
207 {
208
209 if (_requestBuffer == null)
210 _requestBuffer = _bufferPool.acquire(getInputBufferSize(), REQUEST_BUFFER_DIRECT);
211
212
213 filled = getEndPoint().fill(_requestBuffer);
214 if (filled==0)
215 filled = getEndPoint().fill(_requestBuffer);
216
217
218 if (filled < 0)
219 _parser.atEOF();
220 }
221 }
222
223
224 if (_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer))
225 {
226
227
228
229 suspended = !_channel.handle();
230 }
231
232 }
233 }
234 catch (EofException e)
235 {
236 LOG.debug(e);
237 }
238 catch (Exception e)
239 {
240 if (_parser.isIdle())
241 LOG.debug(e);
242 else
243 LOG.warn(this.toString(), e);
244 close();
245 }
246 finally
247 {
248 setCurrentConnection(null);
249 if (!suspended && getEndPoint().isOpen() && getEndPoint().getConnection()==this)
250 {
251 fillInterested();
252 }
253 }
254 }
255
256
257 @Override
258 protected void onFillInterestedFailed(Throwable cause)
259 {
260 _parser.close();
261 super.onFillInterestedFailed(cause);
262 }
263
264 @Override
265 public void onOpen()
266 {
267 super.onOpen();
268 fillInterested();
269 }
270
271 @Override
272 public void run()
273 {
274 onFillable();
275 }
276
277
278 @Override
279 public void send(ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback)
280 {
281 if (info==null)
282 new ContentCallback(content,lastContent,callback).iterate();
283 else
284 {
285
286 if (_channel.isExpecting100Continue())
287
288 _generator.setPersistent(false);
289 new CommitCallback(info,content,lastContent,callback).iterate();
290 }
291 }
292
293 @Override
294 public void send(ByteBuffer content, boolean lastContent, Callback callback)
295 {
296 new ContentCallback(content,lastContent,callback).iterate();
297 }
298
299 @Override
300 public void completed()
301 {
302
303 if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
304 {
305 Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE);
306 if (connection != null)
307 {
308 LOG.debug("Upgrade from {} to {}", this, connection);
309 onClose();
310 getEndPoint().setConnection(connection);
311 connection.onOpen();
312 _channel.reset();
313 _parser.reset();
314 _generator.reset();
315 releaseRequestBuffer();
316 return;
317 }
318 }
319
320
321
322 if (_channel.isExpecting100Continue())
323
324 _parser.close();
325 else if (_parser.inContentState() && _generator.isPersistent())
326
327 _channel.getRequest().getHttpInput().consumeAll();
328
329
330 _channel.reset();
331 if (_generator.isPersistent() && !_parser.isClosed())
332 _parser.reset();
333 else
334 _parser.close();
335 releaseRequestBuffer();
336 if (_chunk!=null)
337 _bufferPool.release(_chunk);
338 _chunk=null;
339 _generator.reset();
340
341
342 if (getCurrentConnection()!=this)
343 {
344
345 if (_parser.isStart())
346 {
347
348 if (_requestBuffer == null)
349 {
350
351 fillInterested();
352 }
353
354 else if (getConnector().isRunning())
355 {
356
357 try
358 {
359 getExecutor().execute(this);
360 }
361 catch (RejectedExecutionException e)
362 {
363 if (getConnector().isRunning())
364 LOG.warn(e);
365 else
366 LOG.ignore(e);
367 getEndPoint().close();
368 }
369 }
370 else
371 {
372 getEndPoint().close();
373 }
374 }
375
376 else if (getEndPoint().isOpen())
377 fillInterested();
378 }
379 }
380
381 protected class HttpChannelOverHttp extends HttpChannel<ByteBuffer>
382 {
383 public HttpChannelOverHttp(Connector connector, HttpConfiguration config, EndPoint endPoint, HttpTransport transport, HttpInput<ByteBuffer> input)
384 {
385 super(connector,config,endPoint,transport,input);
386 }
387
388 @Override
389 public void earlyEOF()
390 {
391
392 if (getRequest().getMethod()==null)
393 close();
394 else
395 super.earlyEOF();
396 }
397
398 @Override
399 public boolean content(ByteBuffer item)
400 {
401 super.content(item);
402 return true;
403 }
404
405 @Override
406 public void badMessage(int status, String reason)
407 {
408 _generator.setPersistent(false);
409 super.badMessage(status,reason);
410 }
411
412 @Override
413 public boolean headerComplete()
414 {
415 boolean persistent;
416 HttpVersion version = getHttpVersion();
417
418 switch (version)
419 {
420 case HTTP_0_9:
421 {
422 persistent = false;
423 break;
424 }
425 case HTTP_1_0:
426 {
427 persistent = getRequest().getHttpFields().contains(HttpHeader.CONNECTION, HttpHeaderValue.KEEP_ALIVE.asString());
428 if (!persistent)
429 persistent = HttpMethod.CONNECT.is(getRequest().getMethod());
430 if (persistent)
431 getResponse().getHttpFields().add(HttpHeader.CONNECTION, HttpHeaderValue.KEEP_ALIVE);
432 break;
433 }
434 case HTTP_1_1:
435 {
436 persistent = !getRequest().getHttpFields().contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString());
437 if (!persistent)
438 persistent = HttpMethod.CONNECT.is(getRequest().getMethod());
439 if (!persistent)
440 getResponse().getHttpFields().add(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE);
441 break;
442 }
443 default:
444 {
445 throw new IllegalStateException();
446 }
447 }
448
449 if (!persistent)
450 _generator.setPersistent(false);
451
452 return super.headerComplete();
453 }
454
455 @Override
456 protected void handleException(Throwable x)
457 {
458 _generator.setPersistent(false);
459 super.handleException(x);
460 }
461
462 @Override
463 public void failed()
464 {
465 getEndPoint().shutdownOutput();
466 }
467 }
468
469 private class CommitCallback extends IteratingNestedCallback
470 {
471 final ByteBuffer _content;
472 final boolean _lastContent;
473 final ResponseInfo _info;
474 ByteBuffer _header;
475
476 CommitCallback(ResponseInfo info, ByteBuffer content, boolean last, Callback callback)
477 {
478 super(callback);
479 _info=info;
480 _content=content;
481 _lastContent=last;
482 }
483
484 @Override
485 public boolean process() throws Exception
486 {
487 ByteBuffer chunk = _chunk;
488 while (true)
489 {
490 HttpGenerator.Result result = _generator.generateResponse(_info, _header, chunk, _content, _lastContent);
491 if (LOG.isDebugEnabled())
492 LOG.debug("{} generate: {} ({},{},{})@{}",
493 this,
494 result,
495 BufferUtil.toSummaryString(_header),
496 BufferUtil.toSummaryString(_content),
497 _lastContent,
498 _generator.getState());
499
500 switch (result)
501 {
502 case NEED_HEADER:
503 {
504
505 if (_lastContent && _content!=null && !_content.isReadOnly() && _content.hasArray() && BufferUtil.space(_content)>_config.getResponseHeaderSize() )
506 {
507
508 int p=_content.position();
509 int l=_content.limit();
510 _content.position(l);
511 _content.limit(l+_config.getResponseHeaderSize());
512 _header=_content.slice();
513 _header.limit(0);
514 _content.position(p);
515 _content.limit(l);
516 }
517 else
518 _header = _bufferPool.acquire(_config.getResponseHeaderSize(), HEADER_BUFFER_DIRECT);
519 continue;
520 }
521 case NEED_CHUNK:
522 {
523 chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT);
524 continue;
525 }
526 case FLUSH:
527 {
528
529 if (_channel.getRequest().isHead())
530 {
531 BufferUtil.clear(chunk);
532 BufferUtil.clear(_content);
533 }
534
535
536 if (BufferUtil.hasContent(_header))
537 {
538 if (BufferUtil.hasContent(_content))
539 {
540 if (BufferUtil.hasContent(chunk))
541 getEndPoint().write(this, _header, chunk, _content);
542 else
543 getEndPoint().write(this, _header, _content);
544 }
545 else
546 getEndPoint().write(this, _header);
547 }
548 else if (BufferUtil.hasContent(chunk))
549 {
550 if (BufferUtil.hasContent(_content))
551 getEndPoint().write(this, chunk, _content);
552 else
553 getEndPoint().write(this, chunk);
554 }
555 else if (BufferUtil.hasContent(_content))
556 {
557 getEndPoint().write(this, _content);
558 }
559 else
560 continue;
561 return false;
562 }
563 case SHUTDOWN_OUT:
564 {
565 getEndPoint().shutdownOutput();
566 continue;
567 }
568 case DONE:
569 {
570 if (_header!=null)
571 {
572
573 if (!_lastContent || _content==null || !_content.hasArray() || !_header.hasArray() || _content.array()!=_header.array())
574 _bufferPool.release(_header);
575 }
576 return true;
577 }
578 case CONTINUE:
579 {
580 break;
581 }
582 default:
583 {
584 throw new IllegalStateException("generateResponse="+result);
585 }
586 }
587 }
588 }
589 }
590
591 private class ContentCallback extends IteratingNestedCallback
592 {
593 final ByteBuffer _content;
594 final boolean _lastContent;
595
596 ContentCallback(ByteBuffer content, boolean last, Callback callback)
597 {
598 super(callback);
599 _content=content;
600 _lastContent=last;
601 }
602
603 @Override
604 public boolean process() throws Exception
605 {
606 ByteBuffer chunk = _chunk;
607 while (true)
608 {
609 HttpGenerator.Result result = _generator.generateResponse(null, null, chunk, _content, _lastContent);
610 if (LOG.isDebugEnabled())
611 LOG.debug("{} generate: {} ({},{})@{}",
612 this,
613 result,
614 BufferUtil.toSummaryString(_content),
615 _lastContent,
616 _generator.getState());
617
618 switch (result)
619 {
620 case NEED_HEADER:
621 throw new IllegalStateException();
622 case NEED_CHUNK:
623 {
624 chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT);
625 continue;
626 }
627 case FLUSH:
628 {
629
630 if (_channel.getRequest().isHead())
631 {
632 BufferUtil.clear(chunk);
633 BufferUtil.clear(_content);
634 continue;
635 }
636 else if (BufferUtil.hasContent(chunk))
637 {
638 if (BufferUtil.hasContent(_content))
639 getEndPoint().write(this, chunk, _content);
640 else
641 getEndPoint().write(this, chunk);
642 }
643 else if (BufferUtil.hasContent(_content))
644 {
645 getEndPoint().write(this, _content);
646 }
647 else
648 continue;
649 return false;
650 }
651 case SHUTDOWN_OUT:
652 {
653 getEndPoint().shutdownOutput();
654 continue;
655 }
656 case DONE:
657 {
658 return true;
659 }
660 case CONTINUE:
661 {
662 break;
663 }
664 default:
665 {
666 throw new IllegalStateException("generateResponse="+result);
667 }
668 }
669 }
670 }
671 }
672
673 }