1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.WritePendingException;
24 import java.util.concurrent.RejectedExecutionException;
25 import java.util.concurrent.atomic.AtomicInteger;
26
27 import org.eclipse.jetty.http.HttpField;
28 import org.eclipse.jetty.http.HttpGenerator;
29 import org.eclipse.jetty.http.HttpHeader;
30 import org.eclipse.jetty.http.HttpHeaderValue;
31 import org.eclipse.jetty.http.HttpParser;
32 import org.eclipse.jetty.http.HttpParser.RequestHandler;
33 import org.eclipse.jetty.http.HttpStatus;
34 import org.eclipse.jetty.http.MetaData;
35 import org.eclipse.jetty.http.PreEncodedHttpField;
36 import org.eclipse.jetty.io.AbstractConnection;
37 import org.eclipse.jetty.io.ByteBufferPool;
38 import org.eclipse.jetty.io.Connection;
39 import org.eclipse.jetty.io.EndPoint;
40 import org.eclipse.jetty.io.EofException;
41 import org.eclipse.jetty.util.BufferUtil;
42 import org.eclipse.jetty.util.Callback;
43 import org.eclipse.jetty.util.IteratingCallback;
44 import org.eclipse.jetty.util.log.Log;
45 import org.eclipse.jetty.util.log.Logger;
46
47
48
49
50 public class HttpConnection extends AbstractConnection implements Runnable, HttpTransport, Connection.UpgradeFrom
51 {
52 private static final Logger LOG = Log.getLogger(HttpConnection.class);
53 public static final HttpField CONNECTION_CLOSE = new PreEncodedHttpField(HttpHeader.CONNECTION,HttpHeaderValue.CLOSE.asString());
54 public static final String UPGRADE_CONNECTION_ATTRIBUTE = "org.eclipse.jetty.server.HttpConnection.UPGRADE";
55 private static final boolean REQUEST_BUFFER_DIRECT=false;
56 private static final boolean HEADER_BUFFER_DIRECT=false;
57 private static final boolean CHUNK_BUFFER_DIRECT=false;
58 private static final ThreadLocal<HttpConnection> __currentConnection = new ThreadLocal<>();
59
60 private final HttpConfiguration _config;
61 private final Connector _connector;
62 private final ByteBufferPool _bufferPool;
63 private final HttpInput _input;
64 private final HttpGenerator _generator;
65 private final HttpChannelOverHttp _channel;
66 private final HttpParser _parser;
67 private final AtomicInteger _contentBufferReferences=new AtomicInteger();
68 private volatile ByteBuffer _requestBuffer = null;
69 private volatile ByteBuffer _chunk = null;
70 private final BlockingReadCallback _blockingReadCallback = new BlockingReadCallback();
71 private final AsyncReadCallback _asyncReadCallback = new AsyncReadCallback();
72 private final SendCallback _sendCallback = new SendCallback();
73
74
75
76
77
78
79
80
81 public static HttpConnection getCurrentConnection()
82 {
83 return __currentConnection.get();
84 }
85
86 protected static HttpConnection setCurrentConnection(HttpConnection connection)
87 {
88 HttpConnection last=__currentConnection.get();
89 __currentConnection.set(connection);
90 return last;
91 }
92
93 public HttpConnection(HttpConfiguration config, Connector connector, EndPoint endPoint)
94 {
95 super(endPoint, connector.getExecutor());
96 _config = config;
97 _connector = connector;
98 _bufferPool = _connector.getByteBufferPool();
99 _generator = newHttpGenerator();
100 _channel = newHttpChannel();
101 _input = _channel.getRequest().getHttpInput();
102 _parser = newHttpParser();
103 if (LOG.isDebugEnabled())
104 LOG.debug("New HTTP Connection {}", this);
105 }
106
107 public HttpConfiguration getHttpConfiguration()
108 {
109 return _config;
110 }
111
112 protected HttpGenerator newHttpGenerator()
113 {
114 return new HttpGenerator(_config.getSendServerVersion(),_config.getSendXPoweredBy());
115 }
116
117 protected HttpChannelOverHttp newHttpChannel()
118 {
119 return new HttpChannelOverHttp(this, _connector, _config, getEndPoint(), this);
120 }
121
122 protected HttpParser newHttpParser()
123 {
124 return new HttpParser(newRequestHandler(), getHttpConfiguration().getRequestHeaderSize());
125 }
126
127 protected HttpParser.RequestHandler newRequestHandler()
128 {
129 return _channel;
130 }
131
132 public Server getServer()
133 {
134 return _connector.getServer();
135 }
136
137 public Connector getConnector()
138 {
139 return _connector;
140 }
141
142 public HttpChannel getHttpChannel()
143 {
144 return _channel;
145 }
146
147 public HttpParser getParser()
148 {
149 return _parser;
150 }
151
152 public HttpGenerator getGenerator()
153 {
154 return _generator;
155 }
156
157 @Override
158 public boolean isOptimizedForDirectBuffers()
159 {
160 return getEndPoint().isOptimizedForDirectBuffers();
161 }
162
163 @Override
164 public int getMessagesIn()
165 {
166 return getHttpChannel().getRequests();
167 }
168
169 @Override
170 public int getMessagesOut()
171 {
172 return getHttpChannel().getRequests();
173 }
174
175 @Override
176 public ByteBuffer onUpgradeFrom()
177 {
178 if (BufferUtil.hasContent(_requestBuffer))
179 {
180 ByteBuffer buffer = _requestBuffer;
181 _requestBuffer=null;
182 return buffer;
183 }
184 return null;
185 }
186
187 void releaseRequestBuffer()
188 {
189 if (_requestBuffer != null && !_requestBuffer.hasRemaining())
190 {
191 if (LOG.isDebugEnabled())
192 LOG.debug("releaseRequestBuffer {}",this);
193 ByteBuffer buffer=_requestBuffer;
194 _requestBuffer=null;
195 _bufferPool.release(buffer);
196 }
197 }
198
199 public ByteBuffer getRequestBuffer()
200 {
201 if (_requestBuffer == null)
202 _requestBuffer = _bufferPool.acquire(getInputBufferSize(), REQUEST_BUFFER_DIRECT);
203 return _requestBuffer;
204 }
205
206 public boolean isRequestBufferEmpty()
207 {
208 return BufferUtil.isEmpty(_requestBuffer);
209 }
210
211 @Override
212 public void onFillable()
213 {
214 if (LOG.isDebugEnabled())
215 LOG.debug("{} onFillable enter {} {}", this, _channel.getState(),BufferUtil.toDetailString(_requestBuffer));
216
217 HttpConnection last=setCurrentConnection(this);
218 try
219 {
220 while (true)
221 {
222
223 int filled = fillRequestBuffer();
224
225
226 boolean handle = parseRequestBuffer();
227
228
229 if (getEndPoint().getConnection()!=this)
230 break;
231
232
233 if (_parser.isClose() || _parser.isClosed())
234 {
235 close();
236 break;
237 }
238
239
240 if (handle)
241 {
242 boolean suspended = !_channel.handle();
243
244
245 if (suspended || getEndPoint().getConnection() != this)
246 break;
247 }
248
249
250 else if (filled<=0)
251 {
252 if (filled==0)
253 fillInterested();
254 break;
255 }
256 }
257 }
258 finally
259 {
260 setCurrentConnection(last);
261 if (LOG.isDebugEnabled())
262 LOG.debug("{} onFillable exit {} {}", this, _channel.getState(),BufferUtil.toDetailString(_requestBuffer));
263 }
264 }
265
266
267
268
269
270 protected boolean fillAndParseForContent()
271 {
272 boolean handled=false;
273 while (_parser.inContentState())
274 {
275 int filled = fillRequestBuffer();
276 boolean handle = parseRequestBuffer();
277 handled|=handle;
278 if (handle || filled<=0 || _channel.getRequest().getHttpInput().hasContent())
279 break;
280 }
281 return handled;
282 }
283
284
285 private int fillRequestBuffer()
286 {
287 if (_contentBufferReferences.get()>0)
288 {
289 LOG.warn("{} fill with unconsumed content!",this);
290 return 0;
291 }
292
293 if (BufferUtil.isEmpty(_requestBuffer))
294 {
295
296 if(getEndPoint().isInputShutdown())
297 {
298
299 _parser.atEOF();
300 if (LOG.isDebugEnabled())
301 LOG.debug("{} filled -1 {}",this,BufferUtil.toDetailString(_requestBuffer));
302 return -1;
303 }
304
305
306
307
308 _requestBuffer = getRequestBuffer();
309
310
311 try
312 {
313 int filled = getEndPoint().fill(_requestBuffer);
314 if (filled==0)
315 filled = getEndPoint().fill(_requestBuffer);
316
317
318 if (filled < 0)
319 _parser.atEOF();
320
321 if (LOG.isDebugEnabled())
322 LOG.debug("{} filled {} {}",this,filled,BufferUtil.toDetailString(_requestBuffer));
323
324 return filled;
325 }
326 catch (IOException e)
327 {
328 LOG.debug(e);
329 return -1;
330 }
331 }
332 return 0;
333 }
334
335
336 private boolean parseRequestBuffer()
337 {
338 if (LOG.isDebugEnabled())
339 LOG.debug("{} parse {} {}",this,BufferUtil.toDetailString(_requestBuffer));
340
341 boolean handle = _parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer);
342
343 if (LOG.isDebugEnabled())
344 LOG.debug("{} parsed {} {}",this,handle,_parser);
345
346
347 if (_contentBufferReferences.get()==0)
348 releaseRequestBuffer();
349
350 return handle;
351 }
352
353
354 @Override
355 public void onCompleted()
356 {
357
358 if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
359 {
360 Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE);
361 if (connection != null)
362 {
363 if (LOG.isDebugEnabled())
364 LOG.debug("Upgrade from {} to {}", this, connection);
365 _channel.getState().upgrade();
366 getEndPoint().upgrade(connection);
367 _channel.recycle();
368 _parser.reset();
369 _generator.reset();
370 if (_contentBufferReferences.get()==0)
371 releaseRequestBuffer();
372 else
373 {
374 LOG.warn("{} lingering content references?!?!",this);
375 _requestBuffer=null;
376 _contentBufferReferences.set(0);
377 }
378 return;
379 }
380 }
381
382
383
384 if (_channel.isExpecting100Continue())
385 {
386
387 _parser.close();
388 }
389 else if (_parser.inContentState() && _generator.isPersistent())
390 {
391
392 if (_channel.getRequest().getHttpInput().isAsync())
393 {
394 if (LOG.isDebugEnabled())
395 LOG.debug("unconsumed async input {}", this);
396 _channel.abort(new IOException("unconsumed input"));
397 }
398 else
399 {
400 if (LOG.isDebugEnabled())
401 LOG.debug("unconsumed input {}", this);
402
403 if (!_channel.getRequest().getHttpInput().consumeAll())
404 _channel.abort(new IOException("unconsumed input"));
405 }
406 }
407
408
409 _channel.recycle();
410 if (_generator.isPersistent() && !_parser.isClosed())
411 _parser.reset();
412 else
413 _parser.close();
414
415
416
417 if (_chunk!=null)
418 _bufferPool.release(_chunk);
419 _chunk=null;
420 _generator.reset();
421
422
423 if (getCurrentConnection()!=this)
424 {
425
426 if (_parser.isStart())
427 {
428
429 if (BufferUtil.isEmpty(_requestBuffer))
430 {
431
432 fillInterested();
433 }
434
435 else if (getConnector().isRunning())
436 {
437
438 try
439 {
440 getExecutor().execute(this);
441 }
442 catch (RejectedExecutionException e)
443 {
444 if (getConnector().isRunning())
445 LOG.warn(e);
446 else
447 LOG.ignore(e);
448 getEndPoint().close();
449 }
450 }
451 else
452 {
453 getEndPoint().close();
454 }
455 }
456
457 else if (getEndPoint().isOpen())
458 fillInterested();
459 }
460 }
461
462 @Override
463 protected void onFillInterestedFailed(Throwable cause)
464 {
465 _parser.close();
466 super.onFillInterestedFailed(cause);
467 }
468
469 @Override
470 public void onOpen()
471 {
472 super.onOpen();
473 fillInterested();
474 }
475
476 @Override
477 public void onClose()
478 {
479 _sendCallback.close();
480 super.onClose();
481 }
482
483 @Override
484 public void run()
485 {
486 onFillable();
487 }
488
489 @Override
490 public void send(MetaData.Response info, boolean head, ByteBuffer content, boolean lastContent, Callback callback)
491 {
492 if (info == null)
493 {
494 if (!lastContent && BufferUtil.isEmpty(content))
495 {
496 callback.succeeded();
497 return;
498 }
499 }
500 else
501 {
502
503 if (_channel.isExpecting100Continue())
504
505 _generator.setPersistent(false);
506 }
507
508 if(_sendCallback.reset(info,head,content,lastContent,callback))
509 _sendCallback.iterate();
510 }
511
512
513 HttpInput.Content newContent(ByteBuffer c)
514 {
515 return new Content(c);
516 }
517
518 @Override
519 public void abort(Throwable failure)
520 {
521
522
523 getEndPoint().close();
524 }
525
526 @Override
527 public boolean isPushSupported()
528 {
529 return false;
530 }
531
532 @Override
533 public void push(org.eclipse.jetty.http.MetaData.Request request)
534 {
535 LOG.debug("ignore push in {}",this);
536 }
537
538 public void asyncReadFillInterested()
539 {
540 getEndPoint().fillInterested(_asyncReadCallback);
541 }
542
543 public void blockingReadFillInterested()
544 {
545 getEndPoint().fillInterested(_blockingReadCallback);
546 }
547
548 public void blockingReadException(Throwable e)
549 {
550 _blockingReadCallback.failed(e);
551 }
552
553 @Override
554 public String toString()
555 {
556 return String.format("%s[p=%s,g=%s,c=%s]",
557 super.toString(),
558 _parser,
559 _generator,
560 _channel);
561 }
562
563 private class Content extends HttpInput.Content
564 {
565 public Content(ByteBuffer content)
566 {
567 super(content);
568 _contentBufferReferences.incrementAndGet();
569 }
570
571 @Override
572 public void succeeded()
573 {
574 if (_contentBufferReferences.decrementAndGet()==0)
575 releaseRequestBuffer();
576 }
577
578 @Override
579 public void failed(Throwable x)
580 {
581 succeeded();
582 }
583 }
584
585 private class BlockingReadCallback implements Callback
586 {
587 @Override
588 public void succeeded()
589 {
590 _input.unblock();
591 }
592
593 @Override
594 public void failed(Throwable x)
595 {
596 _input.failed(x);
597 }
598
599 @Override
600 public boolean isNonBlocking()
601 {
602
603
604 return true;
605 }
606 }
607
608 private class AsyncReadCallback implements Callback
609 {
610 @Override
611 public void succeeded()
612 {
613 if (fillAndParseForContent())
614 _channel.handle();
615 else if (!_input.isFinished())
616 asyncReadFillInterested();
617 }
618
619 @Override
620 public void failed(Throwable x)
621 {
622 if (_input.failed(x))
623 _channel.handle();
624 }
625 }
626
627 private class SendCallback extends IteratingCallback
628 {
629 private MetaData.Response _info;
630 private boolean _head;
631 private ByteBuffer _content;
632 private boolean _lastContent;
633 private Callback _callback;
634 private ByteBuffer _header;
635 private boolean _shutdownOut;
636
637 private SendCallback()
638 {
639 super(true);
640 }
641
642 @Override
643 public boolean isNonBlocking()
644 {
645 return _callback.isNonBlocking();
646 }
647
648 private boolean reset(MetaData.Response info, boolean head, ByteBuffer content, boolean last, Callback callback)
649 {
650 if (reset())
651 {
652 _info = info;
653 _head = head;
654 _content = content;
655 _lastContent = last;
656 _callback = callback;
657 _header = null;
658 _shutdownOut = false;
659 return true;
660 }
661
662 if (isClosed())
663 callback.failed(new EofException());
664 else
665 callback.failed(new WritePendingException());
666 return false;
667 }
668
669 @Override
670 public Action process() throws Exception
671 {
672 if (_callback==null)
673 throw new IllegalStateException();
674
675 ByteBuffer chunk = _chunk;
676 while (true)
677 {
678 HttpGenerator.Result result = _generator.generateResponse(_info, _head, _header, chunk, _content, _lastContent);
679 if (LOG.isDebugEnabled())
680 LOG.debug("{} generate: {} ({},{},{})@{}",
681 this,
682 result,
683 BufferUtil.toSummaryString(_header),
684 BufferUtil.toSummaryString(_content),
685 _lastContent,
686 _generator.getState());
687
688 switch (result)
689 {
690 case NEED_HEADER:
691 {
692 _header = _bufferPool.acquire(_config.getResponseHeaderSize(), HEADER_BUFFER_DIRECT);
693
694 continue;
695 }
696 case NEED_CHUNK:
697 {
698 chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT);
699 continue;
700 }
701 case FLUSH:
702 {
703
704 if (_head || _generator.isNoContent())
705 {
706 BufferUtil.clear(chunk);
707 BufferUtil.clear(_content);
708 }
709
710
711 if (BufferUtil.hasContent(_header))
712 {
713 if (BufferUtil.hasContent(_content))
714 {
715 if (BufferUtil.hasContent(chunk))
716 getEndPoint().write(this, _header, chunk, _content);
717 else
718 getEndPoint().write(this, _header, _content);
719 }
720 else
721 getEndPoint().write(this, _header);
722 }
723 else if (BufferUtil.hasContent(chunk))
724 {
725 if (BufferUtil.hasContent(_content))
726 getEndPoint().write(this, chunk, _content);
727 else
728 getEndPoint().write(this, chunk);
729 }
730 else if (BufferUtil.hasContent(_content))
731 {
732 getEndPoint().write(this, _content);
733 }
734 else
735 {
736 succeeded();
737 }
738 return Action.SCHEDULED;
739 }
740 case SHUTDOWN_OUT:
741 {
742 _shutdownOut=true;
743 continue;
744 }
745 case DONE:
746 {
747 return Action.SUCCEEDED;
748 }
749 case CONTINUE:
750 {
751 break;
752 }
753 default:
754 {
755 throw new IllegalStateException("generateResponse="+result);
756 }
757 }
758 }
759 }
760
761 private void releaseHeader()
762 {
763 ByteBuffer h=_header;
764 _header=null;
765 if (h!=null)
766 _bufferPool.release(h);
767 }
768
769 @Override
770 protected void onCompleteSuccess()
771 {
772 releaseHeader();
773 _callback.succeeded();
774 if (_shutdownOut)
775 getEndPoint().shutdownOutput();
776 }
777
778 @Override
779 public void onCompleteFailure(final Throwable x)
780 {
781 releaseHeader();
782 failedCallback(_callback,x);
783 if (_shutdownOut)
784 getEndPoint().shutdownOutput();
785 }
786
787 @Override
788 public String toString()
789 {
790 return String.format("%s[i=%s,cb=%s]",super.toString(),_info,_callback);
791 }
792 }
793 }