1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.ClosedChannelException;
24 import java.util.concurrent.RejectedExecutionException;
25
26 import org.eclipse.jetty.http.HttpGenerator;
27 import org.eclipse.jetty.http.HttpGenerator.ResponseInfo;
28 import org.eclipse.jetty.http.HttpHeader;
29 import org.eclipse.jetty.http.HttpHeaderValue;
30 import org.eclipse.jetty.http.HttpMethod;
31 import org.eclipse.jetty.http.HttpParser;
32 import org.eclipse.jetty.http.HttpStatus;
33 import org.eclipse.jetty.http.HttpVersion;
34 import org.eclipse.jetty.io.AbstractConnection;
35 import org.eclipse.jetty.io.ByteBufferPool;
36 import org.eclipse.jetty.io.Connection;
37 import org.eclipse.jetty.io.EndPoint;
38 import org.eclipse.jetty.io.EofException;
39 import org.eclipse.jetty.util.BlockingCallback;
40 import org.eclipse.jetty.util.BufferUtil;
41 import org.eclipse.jetty.util.Callback;
42 import org.eclipse.jetty.util.IteratingCallback;
43 import org.eclipse.jetty.util.log.Log;
44 import org.eclipse.jetty.util.log.Logger;
45
46
47
48
49 public class HttpConnection extends AbstractConnection implements Runnable, HttpTransport
50 {
51 public static final String UPGRADE_CONNECTION_ATTRIBUTE = "org.eclipse.jetty.server.HttpConnection.UPGRADE";
52 private static final boolean REQUEST_BUFFER_DIRECT=false;
53 private static final boolean HEADER_BUFFER_DIRECT=false;
54 private static final boolean CHUNK_BUFFER_DIRECT=false;
55 private static final Logger LOG = Log.getLogger(HttpConnection.class);
56 private static final ThreadLocal<HttpConnection> __currentConnection = new ThreadLocal<>();
57
58 private final HttpConfiguration _config;
59 private final Connector _connector;
60 private final ByteBufferPool _bufferPool;
61 private final HttpGenerator _generator;
62 private final HttpChannelOverHttp _channel;
63 private final HttpParser _parser;
64 private volatile ByteBuffer _requestBuffer = null;
65 private volatile ByteBuffer _chunk = null;
66 private BlockingCallback _readBlocker = new BlockingCallback();
67 private BlockingCallback _writeBlocker = new BlockingCallback();
68
69
70 public static HttpConnection getCurrentConnection()
71 {
72 return __currentConnection.get();
73 }
74
75 protected static void setCurrentConnection(HttpConnection connection)
76 {
77 __currentConnection.set(connection);
78 }
79
80 public HttpConfiguration getHttpConfiguration()
81 {
82 return _config;
83 }
84
85 public HttpConnection(HttpConfiguration config, Connector connector, EndPoint endPoint)
86 {
87
88
89 super(endPoint, connector.getExecutor(),true);
90
91 _config = config;
92 _connector = connector;
93 _bufferPool = _connector.getByteBufferPool();
94 _generator = new HttpGenerator(_config.getSendServerVersion(),_config.getSendXPoweredBy());
95 _channel = new HttpChannelOverHttp(connector, config, endPoint, this, new Input());
96 _parser = newHttpParser();
97
98 LOG.debug("New HTTP Connection {}", this);
99 }
100
101 protected HttpParser newHttpParser()
102 {
103 return new HttpParser(newRequestHandler(), getHttpConfiguration().getRequestHeaderSize());
104 }
105
106 protected HttpParser.RequestHandler<ByteBuffer> newRequestHandler()
107 {
108 return _channel;
109 }
110
111 public Server getServer()
112 {
113 return _connector.getServer();
114 }
115
116 public Connector getConnector()
117 {
118 return _connector;
119 }
120
121 public HttpChannel<?> getHttpChannel()
122 {
123 return _channel;
124 }
125
126 public void reset()
127 {
128
129 if (_channel.isExpecting100Continue())
130 {
131
132 _parser.reset();
133
134 _parser.close();
135 if (getEndPoint().isOpen())
136 fillInterested();
137 }
138
139 else if (_generator.isPersistent())
140
141 _parser.reset();
142 else
143 {
144
145 _parser.close();
146 if (getEndPoint().isOpen())
147 fillInterested();
148 }
149
150 _generator.reset();
151 _channel.reset();
152
153 releaseRequestBuffer();
154 if (_chunk!=null)
155 {
156 _bufferPool.release(_chunk);
157 _chunk=null;
158 }
159 }
160
161
162 @Override
163 public int getMessagesIn()
164 {
165 return getHttpChannel().getRequests();
166 }
167
168 @Override
169 public int getMessagesOut()
170 {
171 return getHttpChannel().getRequests();
172 }
173
174 @Override
175 public String toString()
176 {
177 return String.format("%s,g=%s,p=%s",
178 super.toString(),
179 _generator,
180 _parser);
181 }
182
183 private void releaseRequestBuffer()
184 {
185 if (_requestBuffer != null && !_requestBuffer.hasRemaining())
186 {
187 ByteBuffer buffer=_requestBuffer;
188 _requestBuffer=null;
189 _bufferPool.release(buffer);
190 }
191 }
192
193
194
195
196
197
198
199
200
201 @Override
202 public void onFillable()
203 {
204 LOG.debug("{} onFillable {}", this, _channel.getState());
205
206 setCurrentConnection(this);
207 try
208 {
209 while (true)
210 {
211
212 boolean call_channel=_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer);
213
214
215 if (call_channel)
216 {
217
218
219
220 while (_parser.inContentState())
221 {
222 if (!_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer))
223 break;
224 }
225
226
227
228
229 boolean handle=_channel.handle();
230
231
232 if (!handle || getEndPoint().getConnection()!=this)
233 return;
234 }
235 else if (BufferUtil.isEmpty(_requestBuffer))
236 {
237 if (_requestBuffer == null)
238 _requestBuffer = _bufferPool.acquire(getInputBufferSize(), REQUEST_BUFFER_DIRECT);
239
240 int filled = getEndPoint().fill(_requestBuffer);
241 if (filled==0)
242 filled = getEndPoint().fill(_requestBuffer);
243
244 LOG.debug("{} filled {}", this, filled);
245
246
247 if (filled == 0)
248 {
249
250 releaseRequestBuffer();
251 fillInterested();
252 return;
253 }
254 else if (filled < 0)
255 {
256 _parser.shutdownInput();
257
258
259
260
261 if (getEndPoint().isOutputShutdown())
262 getEndPoint().close();
263 else
264 getEndPoint().shutdownOutput();
265
266 releaseRequestBuffer();
267 return;
268 }
269 }
270 else
271 {
272
273 LOG.warn("Unexpected state: "+this+ " "+_channel+" "+_channel.getRequest());
274 if (!_channel.getState().isSuspended())
275 getEndPoint().close();
276 return;
277 }
278 }
279 }
280 catch (EofException e)
281 {
282 LOG.debug(e);
283 }
284 catch (Exception e)
285 {
286 if (_parser.isIdle())
287 LOG.debug(e);
288 else
289 LOG.warn(this.toString(), e);
290 close();
291 }
292 finally
293 {
294 setCurrentConnection(null);
295 }
296 }
297
298 @Override
299 public void onOpen()
300 {
301 super.onOpen();
302 fillInterested();
303 }
304
305 @Override
306 public void run()
307 {
308 onFillable();
309 }
310
311 @Override
312 public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent) throws IOException
313 {
314 try
315 {
316 if (info==null)
317 new ContentCallback(content,lastContent,_writeBlocker).iterate();
318 else
319 {
320
321 if (_channel.isExpecting100Continue())
322
323 _generator.setPersistent(false);
324 new CommitCallback(info,content,lastContent,_writeBlocker).iterate();
325 }
326 _writeBlocker.block();
327 }
328 catch (ClosedChannelException e)
329 {
330 throw new EofException(e);
331 }
332 catch (IOException e)
333 {
334 throw e;
335 }
336 }
337
338 @Override
339 public void send(ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback)
340 {
341 if (info==null)
342 new ContentCallback(content,lastContent,callback).iterate();
343 else
344 {
345
346 if (_channel.isExpecting100Continue())
347
348 _generator.setPersistent(false);
349 new CommitCallback(info,content,lastContent,callback).iterate();
350 }
351 }
352
353 @Override
354 public void send(ByteBuffer content, boolean lastContent, Callback callback)
355 {
356 new ContentCallback(content,lastContent,callback).iterate();
357 }
358
359 @Override
360 public void completed()
361 {
362
363 if (_parser.isInContent() && _generator.isPersistent() && !_channel.isExpecting100Continue())
364
365 _channel.getRequest().getHttpInput().consumeAll();
366
367
368 if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
369 {
370 Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE);
371 if (connection != null)
372 {
373 LOG.debug("Upgrade from {} to {}", this, connection);
374 onClose();
375 getEndPoint().setConnection(connection);
376 connection.onOpen();
377 reset();
378 return;
379 }
380 }
381
382 reset();
383
384
385 if (getCurrentConnection()!=this)
386 {
387 if (_parser.isStart())
388 {
389
390 if (_requestBuffer == null)
391 {
392 fillInterested();
393 }
394 else if (getConnector().isStarted())
395 {
396 LOG.debug("{} pipelined", this);
397
398 try
399 {
400 getExecutor().execute(this);
401 }
402 catch (RejectedExecutionException e)
403 {
404 if (getConnector().isStarted())
405 LOG.warn(e);
406 else
407 LOG.ignore(e);
408 getEndPoint().close();
409 }
410 }
411 else
412 {
413 getEndPoint().close();
414 }
415 }
416 }
417 }
418
419 public ByteBuffer getRequestBuffer()
420 {
421 return _requestBuffer;
422 }
423
424 private class Input extends ByteBufferHttpInput
425 {
426 @Override
427 protected void blockForContent() throws IOException
428 {
429
430
431
432
433
434 while (!_parser.isComplete())
435 {
436
437 boolean event=_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer);
438
439
440
441 while (!event && BufferUtil.hasContent(_requestBuffer) && _parser.inContentState())
442 event=_parser.parseNext(_requestBuffer);
443
444
445 if (_parser.isComplete() || available()>0)
446 return;
447
448
449 if (BufferUtil.isEmpty(_requestBuffer))
450 {
451
452 if (getEndPoint().isInputShutdown())
453 {
454 _parser.shutdownInput();
455 shutdown();
456 return;
457 }
458
459
460 block(_readBlocker);
461 LOG.debug("{} block readable on {}",this,_readBlocker);
462 _readBlocker.block();
463
464
465 if (_requestBuffer==null)
466 {
467 long content_length=_channel.getRequest().getContentLength();
468 int size=getInputBufferSize();
469 if (size<content_length)
470 size=size*4;
471 _requestBuffer=_bufferPool.acquire(size,REQUEST_BUFFER_DIRECT);
472 }
473
474
475 int filled=getEndPoint().fill(_requestBuffer);
476 LOG.debug("{} block filled {}",this,filled);
477 if (filled<0)
478 {
479 _parser.shutdownInput();
480 return;
481 }
482 }
483 }
484 }
485
486 @Override
487 protected void onContentQueued(ByteBuffer ref)
488 {
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504 }
505
506 @Override
507 public void earlyEOF()
508 {
509 synchronized (lock())
510 {
511 _inputEOF=true;
512 _earlyEOF = true;
513 LOG.debug("{} early EOF", this);
514 }
515 }
516
517 @Override
518 public void shutdown()
519 {
520 synchronized (lock())
521 {
522 _inputEOF=true;
523 LOG.debug("{} shutdown", this);
524 }
525 }
526
527 @Override
528 protected void onAllContentConsumed()
529 {
530
531
532
533
534 releaseRequestBuffer();
535 }
536
537 @Override
538 public String toString()
539 {
540 return super.toString()+"{"+_channel+","+HttpConnection.this+"}";
541 }
542 }
543
544 private class HttpChannelOverHttp extends HttpChannel<ByteBuffer>
545 {
546 public HttpChannelOverHttp(Connector connector, HttpConfiguration config, EndPoint endPoint, HttpTransport transport, HttpInput<ByteBuffer> input)
547 {
548 super(connector,config,endPoint,transport,input);
549 }
550
551 @Override
552 public void badMessage(int status, String reason)
553 {
554 _generator.setPersistent(false);
555 super.badMessage(status,reason);
556 }
557
558 @Override
559 public boolean headerComplete()
560 {
561 boolean persistent;
562 HttpVersion version = getHttpVersion();
563
564 switch (version)
565 {
566 case HTTP_0_9:
567 {
568 persistent = false;
569 break;
570 }
571 case HTTP_1_0:
572 {
573 persistent = getRequest().getHttpFields().contains(HttpHeader.CONNECTION, HttpHeaderValue.KEEP_ALIVE.asString());
574 if (!persistent)
575 persistent = HttpMethod.CONNECT.is(getRequest().getMethod());
576 if (persistent)
577 getResponse().getHttpFields().add(HttpHeader.CONNECTION, HttpHeaderValue.KEEP_ALIVE);
578 break;
579 }
580 case HTTP_1_1:
581 {
582 persistent = !getRequest().getHttpFields().contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString());
583 if (!persistent)
584 persistent = HttpMethod.CONNECT.is(getRequest().getMethod());
585 if (!persistent)
586 getResponse().getHttpFields().add(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE);
587 break;
588 }
589 default:
590 {
591 throw new IllegalStateException();
592 }
593 }
594
595 if (!persistent)
596 _generator.setPersistent(false);
597
598 return super.headerComplete();
599 }
600
601 @Override
602 protected void handleException(Throwable x)
603 {
604 _generator.setPersistent(false);
605 super.handleException(x);
606 }
607
608 @Override
609 public void failed()
610 {
611 getEndPoint().shutdownOutput();
612 }
613 }
614
615 private class CommitCallback extends IteratingCallback
616 {
617 final ByteBuffer _content;
618 final boolean _lastContent;
619 final ResponseInfo _info;
620 ByteBuffer _header;
621
622 CommitCallback(ResponseInfo info, ByteBuffer content, boolean last, Callback callback)
623 {
624 super(callback);
625 _info=info;
626 _content=content;
627 _lastContent=last;
628 }
629
630 @Override
631 public boolean process() throws Exception
632 {
633 ByteBuffer chunk = _chunk;
634 while (true)
635 {
636 HttpGenerator.Result result = _generator.generateResponse(_info, _header, chunk, _content, _lastContent);
637 if (LOG.isDebugEnabled())
638 LOG.debug("{} generate: {} ({},{},{})@{}",
639 this,
640 result,
641 BufferUtil.toSummaryString(_header),
642 BufferUtil.toSummaryString(_content),
643 _lastContent,
644 _generator.getState());
645
646 switch (result)
647 {
648 case NEED_HEADER:
649 {
650 if (_lastContent && _content!=null && BufferUtil.space(_content)>_config.getResponseHeaderSize() && _content.hasArray() )
651 {
652
653 int p=_content.position();
654 int l=_content.limit();
655 _content.position(l);
656 _content.limit(l+_config.getResponseHeaderSize());
657 _header=_content.slice();
658 _header.limit(0);
659 _content.position(p);
660 _content.limit(l);
661 }
662 else
663 _header = _bufferPool.acquire(_config.getResponseHeaderSize(), HEADER_BUFFER_DIRECT);
664 continue;
665 }
666 case NEED_CHUNK:
667 {
668 chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT);
669 continue;
670 }
671 case FLUSH:
672 {
673
674 if (_channel.getRequest().isHead())
675 {
676 BufferUtil.clear(chunk);
677 BufferUtil.clear(_content);
678 }
679
680
681 if (BufferUtil.hasContent(_header))
682 {
683 if (BufferUtil.hasContent(_content))
684 {
685 if (BufferUtil.hasContent(chunk))
686 getEndPoint().write(this, _header, chunk, _content);
687 else
688 getEndPoint().write(this, _header, _content);
689 }
690 else
691 getEndPoint().write(this, _header);
692 }
693 else if (BufferUtil.hasContent(chunk))
694 {
695 if (BufferUtil.hasContent(_content))
696 getEndPoint().write(this, chunk, _content);
697 else
698 getEndPoint().write(this, chunk);
699 }
700 else if (BufferUtil.hasContent(_content))
701 {
702 getEndPoint().write(this, _content);
703 }
704 else
705 continue;
706 return false;
707 }
708 case SHUTDOWN_OUT:
709 {
710 getEndPoint().shutdownOutput();
711 continue;
712 }
713 case DONE:
714 {
715 if (_header!=null)
716 {
717
718 if (!_lastContent || _content==null || !_content.hasArray() || !_header.hasArray() || _content.array()!=_header.array())
719 _bufferPool.release(_header);
720 }
721 return true;
722 }
723 case CONTINUE:
724 {
725 break;
726 }
727 default:
728 {
729 throw new IllegalStateException("generateResponse="+result);
730 }
731 }
732 }
733 }
734 }
735
736 private class ContentCallback extends IteratingCallback
737 {
738 final ByteBuffer _content;
739 final boolean _lastContent;
740
741 ContentCallback(ByteBuffer content, boolean last, Callback callback)
742 {
743 super(callback);
744 _content=content;
745 _lastContent=last;
746 }
747
748 @Override
749 public boolean process() throws Exception
750 {
751 ByteBuffer chunk = _chunk;
752 while (true)
753 {
754 HttpGenerator.Result result = _generator.generateResponse(null, null, chunk, _content, _lastContent);
755 if (LOG.isDebugEnabled())
756 LOG.debug("{} generate: {} ({},{})@{}",
757 this,
758 result,
759 BufferUtil.toSummaryString(_content),
760 _lastContent,
761 _generator.getState());
762
763 switch (result)
764 {
765 case NEED_HEADER:
766 throw new IllegalStateException();
767 case NEED_CHUNK:
768 {
769 chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT);
770 continue;
771 }
772 case FLUSH:
773 {
774
775 if (_channel.getRequest().isHead())
776 {
777 BufferUtil.clear(chunk);
778 BufferUtil.clear(_content);
779 continue;
780 }
781 else if (BufferUtil.hasContent(chunk))
782 {
783 if (BufferUtil.hasContent(_content))
784 getEndPoint().write(this, chunk, _content);
785 else
786 getEndPoint().write(this, chunk);
787 }
788 else if (BufferUtil.hasContent(_content))
789 {
790 getEndPoint().write(this, _content);
791 }
792 else
793 continue;
794 return false;
795 }
796 case SHUTDOWN_OUT:
797 {
798 getEndPoint().shutdownOutput();
799 continue;
800 }
801 case DONE:
802 {
803 return true;
804 }
805 case CONTINUE:
806 {
807 break;
808 }
809 default:
810 {
811 throw new IllegalStateException("generateResponse="+result);
812 }
813 }
814 }
815 }
816 }
817 }