1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.WritePendingException;
24 import java.util.concurrent.RejectedExecutionException;
25 import java.util.concurrent.atomic.AtomicInteger;
26
27 import org.eclipse.jetty.http.HttpField;
28 import org.eclipse.jetty.http.HttpGenerator;
29 import org.eclipse.jetty.http.HttpHeader;
30 import org.eclipse.jetty.http.HttpHeaderValue;
31 import org.eclipse.jetty.http.HttpParser;
32 import org.eclipse.jetty.http.HttpParser.RequestHandler;
33 import org.eclipse.jetty.http.HttpStatus;
34 import org.eclipse.jetty.http.MetaData;
35 import org.eclipse.jetty.http.PreEncodedHttpField;
36 import org.eclipse.jetty.io.AbstractConnection;
37 import org.eclipse.jetty.io.ByteBufferPool;
38 import org.eclipse.jetty.io.Connection;
39 import org.eclipse.jetty.io.EndPoint;
40 import org.eclipse.jetty.io.EofException;
41 import org.eclipse.jetty.util.BufferUtil;
42 import org.eclipse.jetty.util.Callback;
43 import org.eclipse.jetty.util.IteratingCallback;
44 import org.eclipse.jetty.util.log.Log;
45 import org.eclipse.jetty.util.log.Logger;
46
47
48
49
50 public class HttpConnection extends AbstractConnection implements Runnable, HttpTransport, Connection.UpgradeFrom
51 {
52 private static final Logger LOG = Log.getLogger(HttpConnection.class);
53 public static final HttpField CONNECTION_CLOSE = new PreEncodedHttpField(HttpHeader.CONNECTION,HttpHeaderValue.CLOSE.asString());
54 public static final String UPGRADE_CONNECTION_ATTRIBUTE = "org.eclipse.jetty.server.HttpConnection.UPGRADE";
55 private static final boolean REQUEST_BUFFER_DIRECT=false;
56 private static final boolean HEADER_BUFFER_DIRECT=false;
57 private static final boolean CHUNK_BUFFER_DIRECT=false;
58 private static final ThreadLocal<HttpConnection> __currentConnection = new ThreadLocal<>();
59
60 private final HttpConfiguration _config;
61 private final Connector _connector;
62 private final ByteBufferPool _bufferPool;
63 private final HttpInput _input;
64 private final HttpGenerator _generator;
65 private final HttpChannelOverHttp _channel;
66 private final HttpParser _parser;
67 private final AtomicInteger _contentBufferReferences=new AtomicInteger();
68 private volatile ByteBuffer _requestBuffer = null;
69 private volatile ByteBuffer _chunk = null;
70 private final BlockingReadCallback _blockingReadCallback = new BlockingReadCallback();
71 private final AsyncReadCallback _asyncReadCallback = new AsyncReadCallback();
72 private final SendCallback _sendCallback = new SendCallback();
73
74
75
76
77
78
79
80
81 public static HttpConnection getCurrentConnection()
82 {
83 return __currentConnection.get();
84 }
85
86 protected static HttpConnection setCurrentConnection(HttpConnection connection)
87 {
88 HttpConnection last=__currentConnection.get();
89 __currentConnection.set(connection);
90 return last;
91 }
92
93 public HttpConnection(HttpConfiguration config, Connector connector, EndPoint endPoint)
94 {
95 super(endPoint, connector.getExecutor());
96 _config = config;
97 _connector = connector;
98 _bufferPool = _connector.getByteBufferPool();
99 _generator = newHttpGenerator();
100 _channel = newHttpChannel();
101 _input = _channel.getRequest().getHttpInput();
102 _parser = newHttpParser();
103 if (LOG.isDebugEnabled())
104 LOG.debug("New HTTP Connection {}", this);
105 }
106
107 public HttpConfiguration getHttpConfiguration()
108 {
109 return _config;
110 }
111
112 protected HttpGenerator newHttpGenerator()
113 {
114 return new HttpGenerator(_config.getSendServerVersion(),_config.getSendXPoweredBy());
115 }
116
117 protected HttpChannelOverHttp newHttpChannel()
118 {
119 return new HttpChannelOverHttp(this, _connector, _config, getEndPoint(), this);
120 }
121
122 protected HttpParser newHttpParser()
123 {
124 return new HttpParser(newRequestHandler(), getHttpConfiguration().getRequestHeaderSize());
125 }
126
127 protected HttpParser.RequestHandler newRequestHandler()
128 {
129 return _channel;
130 }
131
132 public Server getServer()
133 {
134 return _connector.getServer();
135 }
136
137 public Connector getConnector()
138 {
139 return _connector;
140 }
141
142 public HttpChannel getHttpChannel()
143 {
144 return _channel;
145 }
146
147 public HttpParser getParser()
148 {
149 return _parser;
150 }
151
152 public HttpGenerator getGenerator()
153 {
154 return _generator;
155 }
156
157 @Override
158 public boolean isOptimizedForDirectBuffers()
159 {
160 return getEndPoint().isOptimizedForDirectBuffers();
161 }
162
163 @Override
164 public int getMessagesIn()
165 {
166 return getHttpChannel().getRequests();
167 }
168
169 @Override
170 public int getMessagesOut()
171 {
172 return getHttpChannel().getRequests();
173 }
174
175 @Override
176 public ByteBuffer onUpgradeFrom()
177 {
178 if (BufferUtil.hasContent(_requestBuffer))
179 {
180 ByteBuffer buffer = _requestBuffer;
181 _requestBuffer=null;
182 return buffer;
183 }
184 return null;
185 }
186
187 void releaseRequestBuffer()
188 {
189 if (_requestBuffer != null && !_requestBuffer.hasRemaining())
190 {
191 if (LOG.isDebugEnabled())
192 LOG.debug("releaseRequestBuffer {}",this);
193 ByteBuffer buffer=_requestBuffer;
194 _requestBuffer=null;
195 _bufferPool.release(buffer);
196 }
197 }
198
199 public ByteBuffer getRequestBuffer()
200 {
201 if (_requestBuffer == null)
202 _requestBuffer = _bufferPool.acquire(getInputBufferSize(), REQUEST_BUFFER_DIRECT);
203 return _requestBuffer;
204 }
205
206 public boolean isRequestBufferEmpty()
207 {
208 return BufferUtil.isEmpty(_requestBuffer);
209 }
210
211 @Override
212 public void onFillable()
213 {
214 if (LOG.isDebugEnabled())
215 LOG.debug("{} onFillable enter {}", this, _channel.getState());
216
217 HttpConnection last=setCurrentConnection(this);
218 try
219 {
220 while (true)
221 {
222
223 int filled = fillRequestBuffer();
224
225
226 boolean handle = parseRequestBuffer();
227
228
229 if (getEndPoint().getConnection()!=this)
230 break;
231
232
233 if (_parser.isClose() || _parser.isClosed())
234 {
235 close();
236 break;
237 }
238
239
240 if (handle)
241 {
242 boolean suspended = !_channel.handle();
243
244
245 if (suspended || getEndPoint().getConnection() != this)
246 break;
247 }
248
249
250 else if (filled<=0)
251 {
252 if (filled==0)
253 fillInterested();
254 break;
255 }
256 }
257 }
258 finally
259 {
260 setCurrentConnection(last);
261 if (LOG.isDebugEnabled())
262 LOG.debug("{} onFillable exit {}", this, _channel.getState());
263 }
264 }
265
266
267
268
269
270 protected boolean fillAndParseForContent()
271 {
272 boolean handled=false;
273 while (_parser.inContentState())
274 {
275 if (LOG.isDebugEnabled())
276 LOG.debug("{} parseContent",this);
277 int filled = fillRequestBuffer();
278 boolean handle = parseRequestBuffer();
279 handled|=handle;
280 if (handle || filled<=0 || _channel.getRequest().getHttpInput().hasContent())
281 break;
282 }
283 return handled;
284 }
285
286
287 private int fillRequestBuffer()
288 {
289 if (_contentBufferReferences.get()>0)
290 {
291 LOG.warn("{} fill with unconsumed content!",this);
292 return 0;
293 }
294
295 if (BufferUtil.isEmpty(_requestBuffer))
296 {
297
298 if(getEndPoint().isInputShutdown())
299 {
300
301 _parser.atEOF();
302 if (LOG.isDebugEnabled())
303 LOG.debug("{} filled -1",this);
304 return -1;
305 }
306
307
308
309
310 _requestBuffer = getRequestBuffer();
311
312
313 try
314 {
315 int filled = getEndPoint().fill(_requestBuffer);
316 if (filled==0)
317 filled = getEndPoint().fill(_requestBuffer);
318
319
320 if (filled < 0)
321 _parser.atEOF();
322
323 if (LOG.isDebugEnabled())
324 LOG.debug("{} filled {}",this,filled);
325
326 return filled;
327 }
328 catch (IOException e)
329 {
330 LOG.debug(e);
331 return -1;
332 }
333 }
334 return 0;
335 }
336
337
338 private boolean parseRequestBuffer()
339 {
340 if (LOG.isDebugEnabled())
341 LOG.debug("{} parse {} {}",this,BufferUtil.toDetailString(_requestBuffer));
342
343 boolean handle = _parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer);
344
345 if (LOG.isDebugEnabled())
346 LOG.debug("{} parsed {} {}",this,handle,_parser);
347
348
349 if (_contentBufferReferences.get()==0)
350 releaseRequestBuffer();
351
352 return handle;
353 }
354
355
356 @Override
357 public void onCompleted()
358 {
359
360 if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
361 {
362 Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE);
363 if (connection != null)
364 {
365 if (LOG.isDebugEnabled())
366 LOG.debug("Upgrade from {} to {}", this, connection);
367 _channel.getState().upgrade();
368 getEndPoint().upgrade(connection);
369 _channel.recycle();
370 _parser.reset();
371 _generator.reset();
372 if (_contentBufferReferences.get()==0)
373 releaseRequestBuffer();
374 else
375 {
376 LOG.warn("{} lingering content references?!?!",this);
377 _requestBuffer=null;
378 _contentBufferReferences.set(0);
379 }
380 return;
381 }
382 }
383
384
385
386 if (_channel.isExpecting100Continue())
387 {
388
389 _parser.close();
390 }
391 else if (_parser.inContentState() && _generator.isPersistent())
392 {
393
394 if (_channel.getRequest().getHttpInput().isAsync())
395 {
396 if (LOG.isDebugEnabled())
397 LOG.debug("unconsumed async input {}", this);
398 _channel.abort(new IOException("unconsumed input"));
399 }
400 else
401 {
402 if (LOG.isDebugEnabled())
403 LOG.debug("unconsumed input {}", this);
404
405 if (!_channel.getRequest().getHttpInput().consumeAll())
406 _channel.abort(new IOException("unconsumed input"));
407 }
408 }
409
410
411 _channel.recycle();
412 if (_generator.isPersistent() && !_parser.isClosed())
413 _parser.reset();
414 else
415 _parser.close();
416
417
418
419 if (_chunk!=null)
420 _bufferPool.release(_chunk);
421 _chunk=null;
422 _generator.reset();
423
424
425 if (getCurrentConnection()!=this)
426 {
427
428 if (_parser.isStart())
429 {
430
431 if (BufferUtil.isEmpty(_requestBuffer))
432 {
433
434 fillInterested();
435 }
436
437 else if (getConnector().isRunning())
438 {
439
440 try
441 {
442 getExecutor().execute(this);
443 }
444 catch (RejectedExecutionException e)
445 {
446 if (getConnector().isRunning())
447 LOG.warn(e);
448 else
449 LOG.ignore(e);
450 getEndPoint().close();
451 }
452 }
453 else
454 {
455 getEndPoint().close();
456 }
457 }
458
459 else if (getEndPoint().isOpen())
460 fillInterested();
461 }
462 }
463
464 @Override
465 protected void onFillInterestedFailed(Throwable cause)
466 {
467 _parser.close();
468 super.onFillInterestedFailed(cause);
469 }
470
471 @Override
472 public void onOpen()
473 {
474 super.onOpen();
475 fillInterested();
476 }
477
478 @Override
479 public void onClose()
480 {
481 _sendCallback.close();
482 super.onClose();
483 }
484
485 @Override
486 public void run()
487 {
488 onFillable();
489 }
490
491 @Override
492 public void send(MetaData.Response info, boolean head, ByteBuffer content, boolean lastContent, Callback callback)
493 {
494 if (info == null)
495 {
496 if (!lastContent && BufferUtil.isEmpty(content))
497 {
498 callback.succeeded();
499 return;
500 }
501 }
502 else
503 {
504
505 if (_channel.isExpecting100Continue())
506
507 _generator.setPersistent(false);
508 }
509
510 if(_sendCallback.reset(info,head,content,lastContent,callback))
511 _sendCallback.iterate();
512 }
513
514
515 HttpInput.Content newContent(ByteBuffer c)
516 {
517 return new Content(c);
518 }
519
520 @Override
521 public void abort(Throwable failure)
522 {
523
524
525 getEndPoint().close();
526 }
527
528 @Override
529 public boolean isPushSupported()
530 {
531 return false;
532 }
533
534 @Override
535 public void push(org.eclipse.jetty.http.MetaData.Request request)
536 {
537 LOG.debug("ignore push in {}",this);
538 }
539
540 public void asyncReadFillInterested()
541 {
542 getEndPoint().fillInterested(_asyncReadCallback);
543 }
544
545 public void blockingReadFillInterested()
546 {
547 getEndPoint().fillInterested(_blockingReadCallback);
548 }
549
550 public void blockingReadException(Throwable e)
551 {
552 _blockingReadCallback.failed(e);
553 }
554
555 @Override
556 public String toString()
557 {
558 return String.format("%s[p=%s,g=%s,c=%s][b=%s]",
559 super.toString(),
560 _parser,
561 _generator,
562 _channel,
563 BufferUtil.toDetailString(_requestBuffer));
564 }
565
566 private class Content extends HttpInput.Content
567 {
568 public Content(ByteBuffer content)
569 {
570 super(content);
571 _contentBufferReferences.incrementAndGet();
572 }
573
574 @Override
575 public void succeeded()
576 {
577 if (_contentBufferReferences.decrementAndGet()==0)
578 releaseRequestBuffer();
579 }
580
581 @Override
582 public void failed(Throwable x)
583 {
584 succeeded();
585 }
586 }
587
588 private class BlockingReadCallback implements Callback
589 {
590 @Override
591 public void succeeded()
592 {
593 _input.unblock();
594 }
595
596 @Override
597 public void failed(Throwable x)
598 {
599 _input.failed(x);
600 }
601
602 @Override
603 public boolean isNonBlocking()
604 {
605
606
607 return true;
608 }
609 }
610
611 private class AsyncReadCallback implements Callback
612 {
613 @Override
614 public void succeeded()
615 {
616 if (fillAndParseForContent())
617 _channel.handle();
618 else if (!_input.isFinished())
619 asyncReadFillInterested();
620 }
621
622 @Override
623 public void failed(Throwable x)
624 {
625 if (_input.failed(x))
626 _channel.handle();
627 }
628 }
629
630 private class SendCallback extends IteratingCallback
631 {
632 private MetaData.Response _info;
633 private boolean _head;
634 private ByteBuffer _content;
635 private boolean _lastContent;
636 private Callback _callback;
637 private ByteBuffer _header;
638 private boolean _shutdownOut;
639
640 private SendCallback()
641 {
642 super(true);
643 }
644
645 @Override
646 public boolean isNonBlocking()
647 {
648 return _callback.isNonBlocking();
649 }
650
651 private boolean reset(MetaData.Response info, boolean head, ByteBuffer content, boolean last, Callback callback)
652 {
653 if (reset())
654 {
655 _info = info;
656 _head = head;
657 _content = content;
658 _lastContent = last;
659 _callback = callback;
660 _header = null;
661 _shutdownOut = false;
662 return true;
663 }
664
665 if (isClosed())
666 callback.failed(new EofException());
667 else
668 callback.failed(new WritePendingException());
669 return false;
670 }
671
672 @Override
673 public Action process() throws Exception
674 {
675 if (_callback==null)
676 throw new IllegalStateException();
677
678 ByteBuffer chunk = _chunk;
679 while (true)
680 {
681 HttpGenerator.Result result = _generator.generateResponse(_info, _head, _header, chunk, _content, _lastContent);
682 if (LOG.isDebugEnabled())
683 LOG.debug("{} generate: {} ({},{},{})@{}",
684 this,
685 result,
686 BufferUtil.toSummaryString(_header),
687 BufferUtil.toSummaryString(_content),
688 _lastContent,
689 _generator.getState());
690
691 switch (result)
692 {
693 case NEED_HEADER:
694 {
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712 _header = _bufferPool.acquire(_config.getResponseHeaderSize(), HEADER_BUFFER_DIRECT);
713
714 continue;
715 }
716 case NEED_CHUNK:
717 {
718 chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT);
719 continue;
720 }
721 case FLUSH:
722 {
723
724 if (_head || _generator.isNoContent())
725 {
726 BufferUtil.clear(chunk);
727 BufferUtil.clear(_content);
728 }
729
730
731 if (BufferUtil.hasContent(_header))
732 {
733 if (BufferUtil.hasContent(_content))
734 {
735 if (BufferUtil.hasContent(chunk))
736 getEndPoint().write(this, _header, chunk, _content);
737 else
738 getEndPoint().write(this, _header, _content);
739 }
740 else
741 getEndPoint().write(this, _header);
742 }
743 else if (BufferUtil.hasContent(chunk))
744 {
745 if (BufferUtil.hasContent(_content))
746 getEndPoint().write(this, chunk, _content);
747 else
748 getEndPoint().write(this, chunk);
749 }
750 else if (BufferUtil.hasContent(_content))
751 {
752 getEndPoint().write(this, _content);
753 }
754 else
755 {
756 succeeded();
757 }
758 return Action.SCHEDULED;
759 }
760 case SHUTDOWN_OUT:
761 {
762 _shutdownOut=true;
763 continue;
764 }
765 case DONE:
766 {
767 return Action.SUCCEEDED;
768 }
769 case CONTINUE:
770 {
771 break;
772 }
773 default:
774 {
775 throw new IllegalStateException("generateResponse="+result);
776 }
777 }
778 }
779 }
780
781 private void releaseHeader()
782 {
783 ByteBuffer h=_header;
784 _header=null;
785 if (h!=null)
786 _bufferPool.release(h);
787 }
788
789 @Override
790 protected void onCompleteSuccess()
791 {
792 releaseHeader();
793 _callback.succeeded();
794 if (_shutdownOut)
795 getEndPoint().shutdownOutput();
796 }
797
798 @Override
799 public void onCompleteFailure(final Throwable x)
800 {
801 releaseHeader();
802 failedCallback(_callback,x);
803 if (_shutdownOut)
804 getEndPoint().shutdownOutput();
805 }
806
807 @Override
808 public String toString()
809 {
810 return String.format("%s[i=%s,cb=%s]",super.toString(),_info,_callback);
811 }
812 }
813 }