1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.ByteChannel;
24 import java.nio.channels.ClosedChannelException;
25 import java.util.concurrent.RejectedExecutionException;
26
27 import org.eclipse.jetty.http.HttpGenerator;
28 import org.eclipse.jetty.http.HttpGenerator.ResponseInfo;
29 import org.eclipse.jetty.http.HttpHeader;
30 import org.eclipse.jetty.http.HttpHeaderValue;
31 import org.eclipse.jetty.http.HttpMethod;
32 import org.eclipse.jetty.http.HttpParser;
33 import org.eclipse.jetty.http.HttpStatus;
34 import org.eclipse.jetty.http.HttpVersion;
35 import org.eclipse.jetty.io.AbstractConnection;
36 import org.eclipse.jetty.io.ByteBufferPool;
37 import org.eclipse.jetty.io.Connection;
38 import org.eclipse.jetty.io.EndPoint;
39 import org.eclipse.jetty.io.EofException;
40 import org.eclipse.jetty.util.BlockingCallback;
41 import org.eclipse.jetty.util.BufferUtil;
42 import org.eclipse.jetty.util.Callback;
43 import org.eclipse.jetty.util.IteratingCallback;
44 import org.eclipse.jetty.util.log.Log;
45 import org.eclipse.jetty.util.log.Logger;
46
47
48
49
50 public class HttpConnection extends AbstractConnection implements Runnable, HttpTransport
51 {
52 public static final String UPGRADE_CONNECTION_ATTRIBUTE = "org.eclipse.jetty.server.HttpConnection.UPGRADE";
53 private static final boolean REQUEST_BUFFER_DIRECT=false;
54 private static final boolean HEADER_BUFFER_DIRECT=false;
55 private static final boolean CHUNK_BUFFER_DIRECT=false;
56 private static final Logger LOG = Log.getLogger(HttpConnection.class);
57 private static final ThreadLocal<HttpConnection> __currentConnection = new ThreadLocal<>();
58
59 private final HttpConfiguration _config;
60 private final Connector _connector;
61 private final ByteBufferPool _bufferPool;
62 private final HttpGenerator _generator;
63 private final HttpChannelOverHttp _channel;
64 private final HttpParser _parser;
65 private volatile ByteBuffer _requestBuffer = null;
66 private volatile ByteBuffer _chunk = null;
67 private BlockingCallback _readBlocker = new BlockingCallback();
68 private BlockingCallback _writeBlocker = new BlockingCallback();
69
70
71 public static HttpConnection getCurrentConnection()
72 {
73 return __currentConnection.get();
74 }
75
76 protected static void setCurrentConnection(HttpConnection connection)
77 {
78 __currentConnection.set(connection);
79 }
80
81 public HttpConfiguration getHttpConfiguration()
82 {
83 return _config;
84 }
85
86 public HttpConnection(HttpConfiguration config, Connector connector, EndPoint endPoint)
87 {
88
89
90 super(endPoint, connector.getExecutor(),true);
91
92 _config = config;
93 _connector = connector;
94 _bufferPool = _connector.getByteBufferPool();
95 _generator = newHttpGenerator();
96 HttpInput<ByteBuffer> input = newHttpInput();
97 _channel = newHttpChannel(input);
98 _parser = newHttpParser();
99
100 LOG.debug("New HTTP Connection {}", this);
101 }
102
103 protected HttpGenerator newHttpGenerator()
104 {
105 return new HttpGenerator(_config.getSendServerVersion(),_config.getSendXPoweredBy());
106 }
107
108 protected HttpInput<ByteBuffer> newHttpInput()
109 {
110 return new Input();
111 }
112
113 protected HttpChannelOverHttp newHttpChannel(HttpInput<ByteBuffer> httpInput)
114 {
115 return new HttpChannelOverHttp(_connector, _config, getEndPoint(), this, httpInput);
116 }
117
118 protected HttpParser newHttpParser()
119 {
120 return new HttpParser(newRequestHandler(), getHttpConfiguration().getRequestHeaderSize());
121 }
122
123 protected HttpParser.RequestHandler<ByteBuffer> newRequestHandler()
124 {
125 return _channel;
126 }
127
128 public Server getServer()
129 {
130 return _connector.getServer();
131 }
132
133 public Connector getConnector()
134 {
135 return _connector;
136 }
137
138 public HttpChannel<?> getHttpChannel()
139 {
140 return _channel;
141 }
142
143 public void reset()
144 {
145
146 if (_channel.isExpecting100Continue())
147 {
148
149 _parser.reset();
150
151 _parser.close();
152 if (getEndPoint().isOpen())
153 fillInterested();
154 }
155
156 else if (_generator.isPersistent())
157
158 _parser.reset();
159 else
160 {
161
162 _parser.close();
163 if (getEndPoint().isOpen())
164 fillInterested();
165 }
166
167 _generator.reset();
168 _channel.reset();
169
170 releaseRequestBuffer();
171 if (_chunk!=null)
172 {
173 _bufferPool.release(_chunk);
174 _chunk=null;
175 }
176 }
177
178
179 @Override
180 public int getMessagesIn()
181 {
182 return getHttpChannel().getRequests();
183 }
184
185 @Override
186 public int getMessagesOut()
187 {
188 return getHttpChannel().getRequests();
189 }
190
191 @Override
192 public String toString()
193 {
194 return String.format("%s,g=%s,p=%s",
195 super.toString(),
196 _generator,
197 _parser);
198 }
199
200 private void releaseRequestBuffer()
201 {
202 if (_requestBuffer != null && !_requestBuffer.hasRemaining())
203 {
204 ByteBuffer buffer=_requestBuffer;
205 _requestBuffer=null;
206 _bufferPool.release(buffer);
207 }
208 }
209
210
211
212
213
214
215
216
217
218 @Override
219 public void onFillable()
220 {
221 LOG.debug("{} onFillable {}", this, _channel.getState());
222
223 setCurrentConnection(this);
224 try
225 {
226 while (true)
227 {
228
229 boolean call_channel=_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer);
230
231
232 if (call_channel)
233 {
234
235
236
237 while (_parser.inContentState())
238 {
239 if (!_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer))
240 break;
241 }
242
243
244
245
246 boolean handle=_channel.handle();
247
248
249 if (!handle || getEndPoint().getConnection()!=this)
250 return;
251 }
252 else if (BufferUtil.isEmpty(_requestBuffer))
253 {
254 if (_requestBuffer == null)
255 _requestBuffer = _bufferPool.acquire(getInputBufferSize(), REQUEST_BUFFER_DIRECT);
256
257 int filled = getEndPoint().fill(_requestBuffer);
258 if (filled==0)
259 filled = getEndPoint().fill(_requestBuffer);
260
261 LOG.debug("{} filled {}", this, filled);
262
263
264 if (filled == 0)
265 {
266
267 releaseRequestBuffer();
268 fillInterested();
269 return;
270 }
271 else if (filled < 0)
272 {
273 _parser.shutdownInput();
274
275
276
277
278 if (getEndPoint().isOutputShutdown())
279 getEndPoint().close();
280 else
281 getEndPoint().shutdownOutput();
282
283 releaseRequestBuffer();
284 return;
285 }
286 }
287 else
288 {
289
290 LOG.warn("Unexpected state: "+this+ " "+_channel+" "+_channel.getRequest());
291 if (!_channel.getState().isSuspended())
292 getEndPoint().close();
293 return;
294 }
295 }
296 }
297 catch (EofException e)
298 {
299 LOG.debug(e);
300 }
301 catch (Exception e)
302 {
303 if (_parser.isIdle())
304 LOG.debug(e);
305 else
306 LOG.warn(this.toString(), e);
307 close();
308 }
309 finally
310 {
311 setCurrentConnection(null);
312 }
313 }
314
315 @Override
316 public void onOpen()
317 {
318 super.onOpen();
319 fillInterested();
320 }
321
322 @Override
323 public void run()
324 {
325 onFillable();
326 }
327
328 @Override
329 public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent) throws IOException
330 {
331 try
332 {
333 if (info==null)
334 new ContentCallback(content,lastContent,_writeBlocker).iterate();
335 else
336 {
337
338 if (_channel.isExpecting100Continue())
339
340 _generator.setPersistent(false);
341 new CommitCallback(info,content,lastContent,_writeBlocker).iterate();
342 }
343 _writeBlocker.block();
344 }
345 catch (ClosedChannelException e)
346 {
347 throw new EofException(e);
348 }
349 catch (IOException e)
350 {
351 throw e;
352 }
353 }
354
355 @Override
356 public void send(ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback)
357 {
358 if (info==null)
359 new ContentCallback(content,lastContent,callback).iterate();
360 else
361 {
362
363 if (_channel.isExpecting100Continue())
364
365 _generator.setPersistent(false);
366 new CommitCallback(info,content,lastContent,callback).iterate();
367 }
368 }
369
370 @Override
371 public void send(ByteBuffer content, boolean lastContent, Callback callback)
372 {
373 new ContentCallback(content,lastContent,callback).iterate();
374 }
375
376 @Override
377 public void completed()
378 {
379
380 if (_parser.isInContent() && _generator.isPersistent() && !_channel.isExpecting100Continue())
381
382 _channel.getRequest().getHttpInput().consumeAll();
383
384
385 if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
386 {
387 Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE);
388 if (connection != null)
389 {
390 LOG.debug("Upgrade from {} to {}", this, connection);
391 onClose();
392 getEndPoint().setConnection(connection);
393 connection.onOpen();
394 reset();
395 return;
396 }
397 }
398
399 reset();
400
401
402 if (getCurrentConnection()!=this)
403 {
404 if (_parser.isStart())
405 {
406
407 if (_requestBuffer == null)
408 {
409 fillInterested();
410 }
411 else if (getConnector().isStarted())
412 {
413 LOG.debug("{} pipelined", this);
414
415 try
416 {
417 getExecutor().execute(this);
418 }
419 catch (RejectedExecutionException e)
420 {
421 if (getConnector().isStarted())
422 LOG.warn(e);
423 else
424 LOG.ignore(e);
425 getEndPoint().close();
426 }
427 }
428 else
429 {
430 getEndPoint().close();
431 }
432 }
433 }
434 }
435
436 public ByteBuffer getRequestBuffer()
437 {
438 return _requestBuffer;
439 }
440
441 protected class Input extends ByteBufferHttpInput
442 {
443 @Override
444 protected void blockForContent() throws IOException
445 {
446
447
448
449
450
451 while (!_parser.isComplete())
452 {
453
454 boolean event=_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer);
455
456
457
458 while (!event && BufferUtil.hasContent(_requestBuffer) && _parser.inContentState())
459 event=_parser.parseNext(_requestBuffer);
460
461
462 if (_parser.isComplete() || available()>0)
463 return;
464
465
466 if (BufferUtil.isEmpty(_requestBuffer))
467 {
468
469 if (getEndPoint().isInputShutdown())
470 {
471 _parser.shutdownInput();
472 shutdown();
473 return;
474 }
475
476
477 block(_readBlocker);
478 LOG.debug("{} block readable on {}",this,_readBlocker);
479 _readBlocker.block();
480
481
482 if (_requestBuffer==null)
483 {
484 long content_length=_channel.getRequest().getContentLength();
485 int size=getInputBufferSize();
486 if (size<content_length)
487 size=size*4;
488 _requestBuffer=_bufferPool.acquire(size,REQUEST_BUFFER_DIRECT);
489 }
490
491
492 int filled=getEndPoint().fill(_requestBuffer);
493 LOG.debug("{} block filled {}",this,filled);
494 if (filled<0)
495 {
496 _parser.shutdownInput();
497 return;
498 }
499 }
500 }
501 }
502
503 @Override
504 protected void onContentQueued(ByteBuffer ref)
505 {
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521 }
522
523 @Override
524 public void earlyEOF()
525 {
526 synchronized (lock())
527 {
528 _inputEOF=true;
529 _earlyEOF = true;
530 LOG.debug("{} early EOF", this);
531 }
532 }
533
534 @Override
535 public void shutdown()
536 {
537 synchronized (lock())
538 {
539 _inputEOF=true;
540 LOG.debug("{} shutdown", this);
541 }
542 }
543
544 @Override
545 protected void onAllContentConsumed()
546 {
547
548
549
550
551 releaseRequestBuffer();
552 }
553
554 @Override
555 public String toString()
556 {
557 return super.toString()+"{"+_channel+","+HttpConnection.this+"}";
558 }
559 }
560
561 protected class HttpChannelOverHttp extends HttpChannel<ByteBuffer>
562 {
563 public HttpChannelOverHttp(Connector connector, HttpConfiguration config, EndPoint endPoint, HttpTransport transport, HttpInput<ByteBuffer> input)
564 {
565 super(connector,config,endPoint,transport,input);
566 }
567
568 @Override
569 public void badMessage(int status, String reason)
570 {
571 _generator.setPersistent(false);
572 super.badMessage(status,reason);
573 }
574
575 @Override
576 public boolean headerComplete()
577 {
578 boolean persistent;
579 HttpVersion version = getHttpVersion();
580
581 switch (version)
582 {
583 case HTTP_0_9:
584 {
585 persistent = false;
586 break;
587 }
588 case HTTP_1_0:
589 {
590 persistent = getRequest().getHttpFields().contains(HttpHeader.CONNECTION, HttpHeaderValue.KEEP_ALIVE.asString());
591 if (!persistent)
592 persistent = HttpMethod.CONNECT.is(getRequest().getMethod());
593 if (persistent)
594 getResponse().getHttpFields().add(HttpHeader.CONNECTION, HttpHeaderValue.KEEP_ALIVE);
595 break;
596 }
597 case HTTP_1_1:
598 {
599 persistent = !getRequest().getHttpFields().contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString());
600 if (!persistent)
601 persistent = HttpMethod.CONNECT.is(getRequest().getMethod());
602 if (!persistent)
603 getResponse().getHttpFields().add(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE);
604 break;
605 }
606 default:
607 {
608 throw new IllegalStateException();
609 }
610 }
611
612 if (!persistent)
613 _generator.setPersistent(false);
614
615 return super.headerComplete();
616 }
617
618 @Override
619 protected void handleException(Throwable x)
620 {
621 _generator.setPersistent(false);
622 super.handleException(x);
623 }
624
625 @Override
626 public void failed()
627 {
628 getEndPoint().shutdownOutput();
629 }
630 }
631
632 private class CommitCallback extends IteratingCallback
633 {
634 final ByteBuffer _content;
635 final boolean _lastContent;
636 final ResponseInfo _info;
637 ByteBuffer _header;
638
639 CommitCallback(ResponseInfo info, ByteBuffer content, boolean last, Callback callback)
640 {
641 super(callback);
642 _info=info;
643 _content=content;
644 _lastContent=last;
645 }
646
647 @Override
648 public boolean process() throws Exception
649 {
650 ByteBuffer chunk = _chunk;
651 while (true)
652 {
653 HttpGenerator.Result result = _generator.generateResponse(_info, _header, chunk, _content, _lastContent);
654 if (LOG.isDebugEnabled())
655 LOG.debug("{} generate: {} ({},{},{})@{}",
656 this,
657 result,
658 BufferUtil.toSummaryString(_header),
659 BufferUtil.toSummaryString(_content),
660 _lastContent,
661 _generator.getState());
662
663 switch (result)
664 {
665 case NEED_HEADER:
666 {
667
668 if (_lastContent && _content!=null && !_content.isReadOnly() && _content.hasArray() && BufferUtil.space(_content)>_config.getResponseHeaderSize() )
669 {
670
671 int p=_content.position();
672 int l=_content.limit();
673 _content.position(l);
674 _content.limit(l+_config.getResponseHeaderSize());
675 _header=_content.slice();
676 _header.limit(0);
677 _content.position(p);
678 _content.limit(l);
679 }
680 else
681 _header = _bufferPool.acquire(_config.getResponseHeaderSize(), HEADER_BUFFER_DIRECT);
682 continue;
683 }
684 case NEED_CHUNK:
685 {
686 chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT);
687 continue;
688 }
689 case FLUSH:
690 {
691
692 if (_channel.getRequest().isHead())
693 {
694 BufferUtil.clear(chunk);
695 BufferUtil.clear(_content);
696 }
697
698
699 if (BufferUtil.hasContent(_header))
700 {
701 if (BufferUtil.hasContent(_content))
702 {
703 if (BufferUtil.hasContent(chunk))
704 getEndPoint().write(this, _header, chunk, _content);
705 else
706 getEndPoint().write(this, _header, _content);
707 }
708 else
709 getEndPoint().write(this, _header);
710 }
711 else if (BufferUtil.hasContent(chunk))
712 {
713 if (BufferUtil.hasContent(_content))
714 getEndPoint().write(this, chunk, _content);
715 else
716 getEndPoint().write(this, chunk);
717 }
718 else if (BufferUtil.hasContent(_content))
719 {
720 getEndPoint().write(this, _content);
721 }
722 else
723 continue;
724 return false;
725 }
726 case SHUTDOWN_OUT:
727 {
728 getEndPoint().shutdownOutput();
729 continue;
730 }
731 case DONE:
732 {
733 if (_header!=null)
734 {
735
736 if (!_lastContent || _content==null || !_content.hasArray() || !_header.hasArray() || _content.array()!=_header.array())
737 _bufferPool.release(_header);
738 }
739 return true;
740 }
741 case CONTINUE:
742 {
743 break;
744 }
745 default:
746 {
747 throw new IllegalStateException("generateResponse="+result);
748 }
749 }
750 }
751 }
752 }
753
754 private class ContentCallback extends IteratingCallback
755 {
756 final ByteBuffer _content;
757 final boolean _lastContent;
758
759 ContentCallback(ByteBuffer content, boolean last, Callback callback)
760 {
761 super(callback);
762 _content=content;
763 _lastContent=last;
764 }
765
766 @Override
767 public boolean process() throws Exception
768 {
769 ByteBuffer chunk = _chunk;
770 while (true)
771 {
772 HttpGenerator.Result result = _generator.generateResponse(null, null, chunk, _content, _lastContent);
773 if (LOG.isDebugEnabled())
774 LOG.debug("{} generate: {} ({},{})@{}",
775 this,
776 result,
777 BufferUtil.toSummaryString(_content),
778 _lastContent,
779 _generator.getState());
780
781 switch (result)
782 {
783 case NEED_HEADER:
784 throw new IllegalStateException();
785 case NEED_CHUNK:
786 {
787 chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT);
788 continue;
789 }
790 case FLUSH:
791 {
792
793 if (_channel.getRequest().isHead())
794 {
795 BufferUtil.clear(chunk);
796 BufferUtil.clear(_content);
797 continue;
798 }
799 else if (BufferUtil.hasContent(chunk))
800 {
801 if (BufferUtil.hasContent(_content))
802 getEndPoint().write(this, chunk, _content);
803 else
804 getEndPoint().write(this, chunk);
805 }
806 else if (BufferUtil.hasContent(_content))
807 {
808 getEndPoint().write(this, _content);
809 }
810 else
811 continue;
812 return false;
813 }
814 case SHUTDOWN_OUT:
815 {
816 getEndPoint().shutdownOutput();
817 continue;
818 }
819 case DONE:
820 {
821 return true;
822 }
823 case CONTINUE:
824 {
825 break;
826 }
827 default:
828 {
829 throw new IllegalStateException("generateResponse="+result);
830 }
831 }
832 }
833 }
834 }
835 }