1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.WritePendingException;
24 import java.util.concurrent.RejectedExecutionException;
25
26 import org.eclipse.jetty.http.HttpGenerator;
27 import org.eclipse.jetty.http.HttpGenerator.ResponseInfo;
28 import org.eclipse.jetty.http.HttpHeader;
29 import org.eclipse.jetty.http.HttpHeaderValue;
30 import org.eclipse.jetty.http.HttpMethod;
31 import org.eclipse.jetty.http.HttpParser;
32 import org.eclipse.jetty.http.HttpStatus;
33 import org.eclipse.jetty.http.HttpVersion;
34 import org.eclipse.jetty.io.AbstractConnection;
35 import org.eclipse.jetty.io.ByteBufferPool;
36 import org.eclipse.jetty.io.Connection;
37 import org.eclipse.jetty.io.EndPoint;
38 import org.eclipse.jetty.io.EofException;
39 import org.eclipse.jetty.util.BufferUtil;
40 import org.eclipse.jetty.util.Callback;
41 import org.eclipse.jetty.util.IteratingCallback;
42 import org.eclipse.jetty.util.log.Log;
43 import org.eclipse.jetty.util.log.Logger;
44
45
46
47
48 public class HttpConnection extends AbstractConnection implements Runnable, HttpTransport
49 {
50 public static final String UPGRADE_CONNECTION_ATTRIBUTE = "org.eclipse.jetty.server.HttpConnection.UPGRADE";
51 private static final boolean REQUEST_BUFFER_DIRECT=false;
52 private static final boolean HEADER_BUFFER_DIRECT=false;
53 private static final boolean CHUNK_BUFFER_DIRECT=false;
54 private static final Logger LOG = Log.getLogger(HttpConnection.class);
55 private static final ThreadLocal<HttpConnection> __currentConnection = new ThreadLocal<>();
56
57 private final HttpConfiguration _config;
58 private final Connector _connector;
59 private final ByteBufferPool _bufferPool;
60 private final HttpGenerator _generator;
61 private final HttpChannelOverHttp _channel;
62 private final HttpParser _parser;
63 private volatile ByteBuffer _requestBuffer = null;
64 private volatile ByteBuffer _chunk = null;
65 private final SendCallback _sendCallback = new SendCallback();
66
67
68
69
70
71
72
73
74
75 public static HttpConnection getCurrentConnection()
76 {
77 return __currentConnection.get();
78 }
79
80 protected static HttpConnection setCurrentConnection(HttpConnection connection)
81 {
82 HttpConnection last=__currentConnection.get();
83 if (connection==null)
84 __currentConnection.remove();
85 else
86 __currentConnection.set(connection);
87 return last;
88 }
89
90 public HttpConfiguration getHttpConfiguration()
91 {
92 return _config;
93 }
94
95 public HttpConnection(HttpConfiguration config, Connector connector, EndPoint endPoint)
96 {
97
98
99 super(endPoint, connector.getExecutor(),true);
100
101 _config = config;
102 _connector = connector;
103 _bufferPool = _connector.getByteBufferPool();
104 _generator = newHttpGenerator();
105 HttpInput<ByteBuffer> input = newHttpInput();
106 _channel = newHttpChannel(input);
107 _parser = newHttpParser();
108 if (LOG.isDebugEnabled())
109 LOG.debug("New HTTP Connection {}", this);
110 }
111
112 protected HttpGenerator newHttpGenerator()
113 {
114 return new HttpGenerator(_config.getSendServerVersion(),_config.getSendXPoweredBy());
115 }
116
117 protected HttpInput<ByteBuffer> newHttpInput()
118 {
119 return new HttpInputOverHTTP(this);
120 }
121
122 protected HttpChannelOverHttp newHttpChannel(HttpInput<ByteBuffer> httpInput)
123 {
124 return new HttpChannelOverHttp(_connector, _config, getEndPoint(), this, httpInput);
125 }
126
127 protected HttpParser newHttpParser()
128 {
129 return new HttpParser(newRequestHandler(), getHttpConfiguration().getRequestHeaderSize());
130 }
131
132 protected HttpParser.RequestHandler<ByteBuffer> newRequestHandler()
133 {
134 return _channel;
135 }
136
137 public Server getServer()
138 {
139 return _connector.getServer();
140 }
141
142 public Connector getConnector()
143 {
144 return _connector;
145 }
146
147 public HttpChannel<?> getHttpChannel()
148 {
149 return _channel;
150 }
151
152 public HttpParser getParser()
153 {
154 return _parser;
155 }
156
157 @Override
158 public int getMessagesIn()
159 {
160 return getHttpChannel().getRequests();
161 }
162
163 @Override
164 public int getMessagesOut()
165 {
166 return getHttpChannel().getRequests();
167 }
168
169 void releaseRequestBuffer()
170 {
171 if (_requestBuffer != null && !_requestBuffer.hasRemaining())
172 {
173 ByteBuffer buffer=_requestBuffer;
174 _requestBuffer=null;
175 _bufferPool.release(buffer);
176 }
177 }
178
179 public ByteBuffer getRequestBuffer()
180 {
181 if (_requestBuffer == null)
182 _requestBuffer = _bufferPool.acquire(getInputBufferSize(), REQUEST_BUFFER_DIRECT);
183 return _requestBuffer;
184 }
185
186
187
188
189
190
191
192
193
194 @Override
195 public void onFillable()
196 {
197 if (LOG.isDebugEnabled())
198 LOG.debug("{} onFillable {}", this, _channel.getState());
199
200 final HttpConnection last=setCurrentConnection(this);
201 int filled=Integer.MAX_VALUE;
202 boolean suspended=false;
203 try
204 {
205
206 while (!suspended && getEndPoint().getConnection()==this)
207 {
208
209 if (BufferUtil.isEmpty(_requestBuffer))
210 {
211
212 if (filled<=0)
213 break;
214
215
216 if(getEndPoint().isInputShutdown())
217 {
218
219 filled=-1;
220 _parser.atEOF();
221 }
222 else
223 {
224
225
226
227 _requestBuffer = getRequestBuffer();
228
229
230 filled = getEndPoint().fill(_requestBuffer);
231 if (filled==0)
232 filled = getEndPoint().fill(_requestBuffer);
233
234
235 if (filled < 0)
236 _parser.atEOF();
237 }
238 }
239
240
241 if (_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer))
242 {
243
244
245
246
247
248 suspended = !_channel.handle();
249 }
250 else
251 {
252
253
254
255 releaseRequestBuffer();
256 }
257 }
258 }
259 catch (EofException e)
260 {
261 LOG.debug(e);
262 }
263 catch (Exception e)
264 {
265 if (_parser.isIdle())
266 LOG.debug(e);
267 else
268 LOG.warn(this.toString(), e);
269 close();
270 }
271 finally
272 {
273 setCurrentConnection(last);
274 if (!suspended && getEndPoint().isOpen() && getEndPoint().getConnection()==this)
275 {
276 fillInterested();
277 }
278 }
279 }
280
281
282
283
284
285 protected void parseContent() throws IOException
286 {
287
288
289
290
291 ByteBuffer requestBuffer = getRequestBuffer();
292
293 while (_parser.inContentState())
294 {
295
296 boolean parsed = _parser.parseNext(requestBuffer==null?BufferUtil.EMPTY_BUFFER:requestBuffer);
297
298
299 if (BufferUtil.isEmpty(requestBuffer) && getEndPoint().isInputShutdown())
300 {
301 _parser.atEOF();
302 if (parsed)
303 break;
304 continue;
305 }
306
307 if (parsed)
308 break;
309
310
311 int filled=getEndPoint().fill(requestBuffer);
312 if (LOG.isDebugEnabled())
313 LOG.debug("{} filled {}",this,filled);
314 if (filled<=0)
315 {
316 if (filled<0)
317 {
318 _parser.atEOF();
319 continue;
320 }
321 break;
322 }
323 }
324 }
325
326 @Override
327 public void completed()
328 {
329
330 if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
331 {
332 Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE);
333 if (connection != null)
334 {
335 if (LOG.isDebugEnabled())
336 LOG.debug("Upgrade from {} to {}", this, connection);
337 onClose();
338 getEndPoint().setConnection(connection);
339 connection.onOpen();
340 _channel.reset();
341 _parser.reset();
342 _generator.reset();
343 releaseRequestBuffer();
344 return;
345 }
346 }
347
348
349
350 if (_channel.isExpecting100Continue())
351 {
352
353 _parser.close();
354 }
355 else if (_parser.inContentState() && _generator.isPersistent())
356 {
357
358 if (_channel.getRequest().getHttpInput().isAsync())
359 {
360 if (LOG.isDebugEnabled())
361 LOG.debug("unconsumed async input {}", this);
362 _channel.abort();
363 }
364 else
365 {
366 if (LOG.isDebugEnabled())
367 LOG.debug("unconsumed input {}", this);
368
369 if (!_channel.getRequest().getHttpInput().consumeAll())
370 _channel.abort();
371 }
372 }
373
374
375 _channel.reset();
376 if (_generator.isPersistent() && !_parser.isClosed())
377 _parser.reset();
378 else
379 _parser.close();
380
381
382
383 releaseRequestBuffer();
384 if (_chunk!=null)
385 _bufferPool.release(_chunk);
386 _chunk=null;
387 _generator.reset();
388
389
390 if (getCurrentConnection()!=this)
391 {
392
393 if (_parser.isStart())
394 {
395
396 if (BufferUtil.isEmpty(_requestBuffer))
397 {
398
399 fillInterested();
400 }
401
402 else if (getConnector().isRunning())
403 {
404
405 try
406 {
407 getExecutor().execute(this);
408 }
409 catch (RejectedExecutionException e)
410 {
411 if (getConnector().isRunning())
412 LOG.warn(e);
413 else
414 LOG.ignore(e);
415 getEndPoint().close();
416 }
417 }
418 else
419 {
420 getEndPoint().close();
421 }
422 }
423
424 else if (getEndPoint().isOpen())
425 fillInterested();
426 }
427 }
428
429 @Override
430 protected void onFillInterestedFailed(Throwable cause)
431 {
432 _parser.close();
433 super.onFillInterestedFailed(cause);
434 }
435
436 @Override
437 public void onOpen()
438 {
439 super.onOpen();
440 fillInterested();
441 }
442
443 @Override
444 public void onClose()
445 {
446 _sendCallback.close();
447 super.onClose();
448 }
449
450 @Override
451 public void run()
452 {
453 onFillable();
454 }
455
456 @Override
457 public void send(ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback)
458 {
459
460 if (info!=null && _channel.isExpecting100Continue())
461
462 _generator.setPersistent(false);
463
464 if(_sendCallback.reset(info,content,lastContent,callback))
465 _sendCallback.iterate();
466 }
467
468 @Override
469 public void send(ByteBuffer content, boolean lastContent, Callback callback)
470 {
471 if (!lastContent && BufferUtil.isEmpty(content))
472 callback.succeeded();
473 else if (_sendCallback.reset(null,content,lastContent,callback))
474 _sendCallback.iterate();
475 }
476
477 protected class HttpChannelOverHttp extends HttpChannel<ByteBuffer>
478 {
479 public HttpChannelOverHttp(Connector connector, HttpConfiguration config, EndPoint endPoint, HttpTransport transport, HttpInput<ByteBuffer> input)
480 {
481 super(connector,config,endPoint,transport,input);
482 }
483
484 @Override
485 public void earlyEOF()
486 {
487
488 if (getRequest().getMethod()==null)
489 close();
490 else
491 super.earlyEOF();
492 }
493
494 @Override
495 public boolean content(ByteBuffer item)
496 {
497 super.content(item);
498 return true;
499 }
500
501 @Override
502 public void badMessage(int status, String reason)
503 {
504 _generator.setPersistent(false);
505 super.badMessage(status,reason);
506 }
507
508 @Override
509 public boolean headerComplete()
510 {
511 boolean persistent;
512 HttpVersion version = getHttpVersion();
513
514 switch (version)
515 {
516 case HTTP_0_9:
517 {
518 persistent = false;
519 break;
520 }
521 case HTTP_1_0:
522 {
523 persistent = getRequest().getHttpFields().contains(HttpHeader.CONNECTION, HttpHeaderValue.KEEP_ALIVE.asString());
524 if (!persistent)
525 persistent = HttpMethod.CONNECT.is(getRequest().getMethod());
526 if (persistent)
527 getResponse().getHttpFields().add(HttpHeader.CONNECTION, HttpHeaderValue.KEEP_ALIVE);
528 break;
529 }
530 case HTTP_1_1:
531 {
532 persistent = !getRequest().getHttpFields().contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString());
533 if (!persistent)
534 persistent = HttpMethod.CONNECT.is(getRequest().getMethod());
535 if (!persistent)
536 getResponse().getHttpFields().add(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE);
537 break;
538 }
539 default:
540 {
541 throw new IllegalStateException();
542 }
543 }
544
545 if (!persistent)
546 _generator.setPersistent(false);
547
548 if (!super.headerComplete())
549 return false;
550
551
552
553 if (getHttpConfiguration().isDelayDispatchUntilContent() && _parser.getContentLength() > 0 &&
554 !isExpecting100Continue() && !isCommitted() && BufferUtil.isEmpty(_requestBuffer))
555 return false;
556
557 return true;
558 }
559
560 @Override
561 protected void handleException(Throwable x)
562 {
563 _generator.setPersistent(false);
564 super.handleException(x);
565 }
566
567 @Override
568 public void abort()
569 {
570 super.abort();
571 _generator.setPersistent(false);
572 }
573
574 @Override
575 public boolean messageComplete()
576 {
577 super.messageComplete();
578 return false;
579 }
580 }
581
582 private class SendCallback extends IteratingCallback
583 {
584 private ResponseInfo _info;
585 private ByteBuffer _content;
586 private boolean _lastContent;
587 private Callback _callback;
588 private ByteBuffer _header;
589 private boolean _shutdownOut;
590
591 private SendCallback()
592 {
593 super(true);
594 }
595
596 private boolean reset(ResponseInfo info, ByteBuffer content, boolean last, Callback callback)
597 {
598 if (reset())
599 {
600 _info = info;
601 _content = content;
602 _lastContent = last;
603 _callback = callback;
604 _header = null;
605 _shutdownOut = false;
606 return true;
607 }
608
609 if (isClosed())
610 callback.failed(new EofException());
611 else
612 callback.failed(new WritePendingException());
613 return false;
614 }
615
616 @Override
617 public Action process() throws Exception
618 {
619 if (_callback==null)
620 throw new IllegalStateException();
621
622 ByteBuffer chunk = _chunk;
623 while (true)
624 {
625 HttpGenerator.Result result = _generator.generateResponse(_info, _header, chunk, _content, _lastContent);
626 if (LOG.isDebugEnabled())
627 LOG.debug("{} generate: {} ({},{},{})@{}",
628 this,
629 result,
630 BufferUtil.toSummaryString(_header),
631 BufferUtil.toSummaryString(_content),
632 _lastContent,
633 _generator.getState());
634
635 switch (result)
636 {
637 case NEED_HEADER:
638 {
639 _header = _bufferPool.acquire(_config.getResponseHeaderSize(), HEADER_BUFFER_DIRECT);
640 continue;
641 }
642 case NEED_CHUNK:
643 {
644 chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT);
645 continue;
646 }
647 case FLUSH:
648 {
649
650 if (_channel.getRequest().isHead() || _generator.isNoContent())
651 {
652 BufferUtil.clear(chunk);
653 BufferUtil.clear(_content);
654 }
655
656
657 if (BufferUtil.hasContent(_header))
658 {
659 if (BufferUtil.hasContent(_content))
660 {
661 if (BufferUtil.hasContent(chunk))
662 getEndPoint().write(this, _header, chunk, _content);
663 else
664 getEndPoint().write(this, _header, _content);
665 }
666 else
667 getEndPoint().write(this, _header);
668 }
669 else if (BufferUtil.hasContent(chunk))
670 {
671 if (BufferUtil.hasContent(_content))
672 getEndPoint().write(this, chunk, _content);
673 else
674 getEndPoint().write(this, chunk);
675 }
676 else if (BufferUtil.hasContent(_content))
677 {
678 getEndPoint().write(this, _content);
679 }
680 else
681 {
682 succeeded();
683 }
684 return Action.SCHEDULED;
685 }
686 case SHUTDOWN_OUT:
687 {
688 _shutdownOut=true;
689 continue;
690 }
691 case DONE:
692 {
693 return Action.SUCCEEDED;
694 }
695 case CONTINUE:
696 {
697 break;
698 }
699 default:
700 {
701 throw new IllegalStateException("generateResponse="+result);
702 }
703 }
704 }
705 }
706
707 private void releaseHeader()
708 {
709 ByteBuffer h=_header;
710 _header=null;
711 if (h!=null)
712 _bufferPool.release(h);
713 }
714
715 @Override
716 protected void onCompleteSuccess()
717 {
718 releaseHeader();
719 _callback.succeeded();
720 if (_shutdownOut)
721 getEndPoint().shutdownOutput();
722 }
723
724 @Override
725 public void onCompleteFailure(final Throwable x)
726 {
727 releaseHeader();
728 failedCallback(_callback,x);
729 if (_shutdownOut)
730 getEndPoint().shutdownOutput();
731 }
732
733 @Override
734 public String toString()
735 {
736 return String.format("%s[i=%s,cb=%s]",super.toString(),_info,_callback);
737 }
738 }
739
740
741 @Override
742 public void abort()
743 {
744
745
746 getEndPoint().close();
747 }
748
749 }