1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.WritePendingException;
24 import java.util.concurrent.RejectedExecutionException;
25
26 import org.eclipse.jetty.http.HttpGenerator;
27 import org.eclipse.jetty.http.HttpGenerator.ResponseInfo;
28 import org.eclipse.jetty.http.HttpHeader;
29 import org.eclipse.jetty.http.HttpHeaderValue;
30 import org.eclipse.jetty.http.HttpMethod;
31 import org.eclipse.jetty.http.HttpParser;
32 import org.eclipse.jetty.http.HttpStatus;
33 import org.eclipse.jetty.http.HttpVersion;
34 import org.eclipse.jetty.io.AbstractConnection;
35 import org.eclipse.jetty.io.ByteBufferPool;
36 import org.eclipse.jetty.io.Connection;
37 import org.eclipse.jetty.io.EndPoint;
38 import org.eclipse.jetty.io.EofException;
39 import org.eclipse.jetty.util.BufferUtil;
40 import org.eclipse.jetty.util.Callback;
41 import org.eclipse.jetty.util.IteratingCallback;
42 import org.eclipse.jetty.util.log.Log;
43 import org.eclipse.jetty.util.log.Logger;
44
45
46
47
48 public class HttpConnection extends AbstractConnection implements Runnable, HttpTransport
49 {
50 public static final String UPGRADE_CONNECTION_ATTRIBUTE = "org.eclipse.jetty.server.HttpConnection.UPGRADE";
51 private static final boolean REQUEST_BUFFER_DIRECT=false;
52 private static final boolean HEADER_BUFFER_DIRECT=false;
53 private static final boolean CHUNK_BUFFER_DIRECT=false;
54 private static final Logger LOG = Log.getLogger(HttpConnection.class);
55 private static final ThreadLocal<HttpConnection> __currentConnection = new ThreadLocal<>();
56
57 private final HttpConfiguration _config;
58 private final Connector _connector;
59 private final ByteBufferPool _bufferPool;
60 private final HttpGenerator _generator;
61 private final HttpChannelOverHttp _channel;
62 private final HttpParser _parser;
63 private volatile ByteBuffer _requestBuffer = null;
64 private volatile ByteBuffer _chunk = null;
65 private final SendCallback _sendCallback = new SendCallback();
66
67
68
69
70
71
72
73
74
75 public static HttpConnection getCurrentConnection()
76 {
77 return __currentConnection.get();
78 }
79
80 protected static HttpConnection setCurrentConnection(HttpConnection connection)
81 {
82 HttpConnection last=__currentConnection.get();
83 if (connection==null)
84 __currentConnection.remove();
85 else
86 __currentConnection.set(connection);
87 return last;
88 }
89
90 public HttpConfiguration getHttpConfiguration()
91 {
92 return _config;
93 }
94
95 public HttpConnection(HttpConfiguration config, Connector connector, EndPoint endPoint)
96 {
97
98
99 super(endPoint, connector.getExecutor(),true);
100
101 _config = config;
102 _connector = connector;
103 _bufferPool = _connector.getByteBufferPool();
104 _generator = newHttpGenerator();
105 HttpInput<ByteBuffer> input = newHttpInput();
106 _channel = newHttpChannel(input);
107 _parser = newHttpParser();
108 if (LOG.isDebugEnabled())
109 LOG.debug("New HTTP Connection {}", this);
110 }
111
112 protected HttpGenerator newHttpGenerator()
113 {
114 return new HttpGenerator(_config.getSendServerVersion(),_config.getSendXPoweredBy());
115 }
116
117 protected HttpInput<ByteBuffer> newHttpInput()
118 {
119 return new HttpInputOverHTTP(this);
120 }
121
122 protected HttpChannelOverHttp newHttpChannel(HttpInput<ByteBuffer> httpInput)
123 {
124 return new HttpChannelOverHttp(_connector, _config, getEndPoint(), this, httpInput);
125 }
126
127 protected HttpParser newHttpParser()
128 {
129 return new HttpParser(newRequestHandler(), getHttpConfiguration().getRequestHeaderSize());
130 }
131
132 protected HttpParser.RequestHandler<ByteBuffer> newRequestHandler()
133 {
134 return _channel;
135 }
136
137 public Server getServer()
138 {
139 return _connector.getServer();
140 }
141
142 public Connector getConnector()
143 {
144 return _connector;
145 }
146
147 public HttpChannel<?> getHttpChannel()
148 {
149 return _channel;
150 }
151
152 public HttpParser getParser()
153 {
154 return _parser;
155 }
156
157 @Override
158 public int getMessagesIn()
159 {
160 return getHttpChannel().getRequests();
161 }
162
163 @Override
164 public int getMessagesOut()
165 {
166 return getHttpChannel().getRequests();
167 }
168
169 void releaseRequestBuffer()
170 {
171 if (_requestBuffer != null && !_requestBuffer.hasRemaining())
172 {
173 ByteBuffer buffer=_requestBuffer;
174 _requestBuffer=null;
175 _bufferPool.release(buffer);
176 }
177 }
178
179 public ByteBuffer getRequestBuffer()
180 {
181 if (_requestBuffer == null)
182 _requestBuffer = _bufferPool.acquire(getInputBufferSize(), REQUEST_BUFFER_DIRECT);
183 return _requestBuffer;
184 }
185
186
187
188
189
190
191
192
193
194 @Override
195 public void onFillable()
196 {
197 if (LOG.isDebugEnabled())
198 LOG.debug("{} onFillable {}", this, _channel.getState());
199
200 final HttpConnection last=setCurrentConnection(this);
201 int filled=Integer.MAX_VALUE;
202 boolean suspended=false;
203 try
204 {
205
206 while (!suspended && getEndPoint().getConnection()==this)
207 {
208
209 if (BufferUtil.isEmpty(_requestBuffer))
210 {
211
212 if (filled<=0)
213 break;
214
215
216 if(getEndPoint().isInputShutdown())
217 {
218
219 filled=-1;
220 _parser.atEOF();
221 }
222 else
223 {
224
225
226
227 _requestBuffer = getRequestBuffer();
228
229
230 filled = getEndPoint().fill(_requestBuffer);
231 if (filled==0)
232 filled = getEndPoint().fill(_requestBuffer);
233
234
235 if (filled < 0)
236 _parser.atEOF();
237 }
238 }
239
240
241 if (_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer))
242 {
243
244
245
246
247
248 suspended = !_channel.handle();
249 }
250 else
251 {
252
253
254
255 releaseRequestBuffer();
256 }
257 }
258 }
259 catch (EofException e)
260 {
261 LOG.debug(e);
262 }
263 catch (Exception e)
264 {
265 if (_parser.isIdle())
266 LOG.debug(e);
267 else
268 LOG.warn(this.toString(), e);
269 close();
270 }
271 finally
272 {
273 setCurrentConnection(last);
274 if (!suspended && getEndPoint().isOpen() && getEndPoint().getConnection()==this)
275 {
276 fillInterested();
277 }
278 }
279 }
280
281
282
283
284
285 protected void parseContent() throws IOException
286 {
287
288
289
290
291 ByteBuffer requestBuffer = getRequestBuffer();
292
293 while (_parser.inContentState())
294 {
295
296 boolean parsed = _parser.parseNext(requestBuffer==null?BufferUtil.EMPTY_BUFFER:requestBuffer);
297
298
299 if (BufferUtil.isEmpty(requestBuffer) && getEndPoint().isInputShutdown())
300 {
301 _parser.atEOF();
302 if (parsed)
303 break;
304 continue;
305 }
306
307 if (parsed)
308 break;
309
310
311 int filled=getEndPoint().fill(requestBuffer);
312 if (LOG.isDebugEnabled())
313 LOG.debug("{} filled {}",this,filled);
314 if (filled<=0)
315 {
316 if (filled<0)
317 {
318 _parser.atEOF();
319 continue;
320 }
321 break;
322 }
323 }
324 }
325
326 @Override
327 public void completed()
328 {
329
330 if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
331 {
332 Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE);
333 if (connection != null)
334 {
335 if (LOG.isDebugEnabled())
336 LOG.debug("Upgrade from {} to {}", this, connection);
337 onClose();
338 getEndPoint().setConnection(connection);
339 connection.onOpen();
340 _channel.reset();
341 _parser.reset();
342 _generator.reset();
343 releaseRequestBuffer();
344 return;
345 }
346 }
347
348
349
350 if (_channel.isExpecting100Continue())
351
352 _parser.close();
353 else if (_parser.inContentState() && _generator.isPersistent())
354
355 _channel.getRequest().getHttpInput().consumeAll();
356
357
358 _channel.reset();
359 if (_generator.isPersistent() && !_parser.isClosed())
360 _parser.reset();
361 else
362 _parser.close();
363
364
365
366 releaseRequestBuffer();
367 if (_chunk!=null)
368 _bufferPool.release(_chunk);
369 _chunk=null;
370 _generator.reset();
371
372
373 if (getCurrentConnection()!=this)
374 {
375
376 if (_parser.isStart())
377 {
378
379 if (BufferUtil.isEmpty(_requestBuffer))
380 {
381
382 fillInterested();
383 }
384
385 else if (getConnector().isRunning())
386 {
387
388 try
389 {
390 getExecutor().execute(this);
391 }
392 catch (RejectedExecutionException e)
393 {
394 if (getConnector().isRunning())
395 LOG.warn(e);
396 else
397 LOG.ignore(e);
398 getEndPoint().close();
399 }
400 }
401 else
402 {
403 getEndPoint().close();
404 }
405 }
406
407 else if (getEndPoint().isOpen())
408 fillInterested();
409 }
410 }
411
412 @Override
413 protected void onFillInterestedFailed(Throwable cause)
414 {
415 _parser.close();
416 super.onFillInterestedFailed(cause);
417 }
418
419 @Override
420 public void onOpen()
421 {
422 super.onOpen();
423 fillInterested();
424 }
425
426 @Override
427 public void onClose()
428 {
429 _sendCallback.close();
430 super.onClose();
431 }
432
433 @Override
434 public void run()
435 {
436 onFillable();
437 }
438
439 @Override
440 public void send(ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback)
441 {
442
443 if (info!=null && _channel.isExpecting100Continue())
444
445 _generator.setPersistent(false);
446
447 if(_sendCallback.reset(info,content,lastContent,callback))
448 _sendCallback.iterate();
449 }
450
451 @Override
452 public void send(ByteBuffer content, boolean lastContent, Callback callback)
453 {
454 if (!lastContent && BufferUtil.isEmpty(content))
455 callback.succeeded();
456 else if (_sendCallback.reset(null,content,lastContent,callback))
457 _sendCallback.iterate();
458 }
459
460 protected class HttpChannelOverHttp extends HttpChannel<ByteBuffer>
461 {
462 public HttpChannelOverHttp(Connector connector, HttpConfiguration config, EndPoint endPoint, HttpTransport transport, HttpInput<ByteBuffer> input)
463 {
464 super(connector,config,endPoint,transport,input);
465 }
466
467 @Override
468 public void earlyEOF()
469 {
470
471 if (getRequest().getMethod()==null)
472 close();
473 else
474 super.earlyEOF();
475 }
476
477 @Override
478 public boolean content(ByteBuffer item)
479 {
480 super.content(item);
481 return true;
482 }
483
484 @Override
485 public void badMessage(int status, String reason)
486 {
487 _generator.setPersistent(false);
488 super.badMessage(status,reason);
489 }
490
491 @Override
492 public boolean headerComplete()
493 {
494 boolean persistent;
495 HttpVersion version = getHttpVersion();
496
497 switch (version)
498 {
499 case HTTP_0_9:
500 {
501 persistent = false;
502 break;
503 }
504 case HTTP_1_0:
505 {
506 persistent = getRequest().getHttpFields().contains(HttpHeader.CONNECTION, HttpHeaderValue.KEEP_ALIVE.asString());
507 if (!persistent)
508 persistent = HttpMethod.CONNECT.is(getRequest().getMethod());
509 if (persistent)
510 getResponse().getHttpFields().add(HttpHeader.CONNECTION, HttpHeaderValue.KEEP_ALIVE);
511 break;
512 }
513 case HTTP_1_1:
514 {
515 persistent = !getRequest().getHttpFields().contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString());
516 if (!persistent)
517 persistent = HttpMethod.CONNECT.is(getRequest().getMethod());
518 if (!persistent)
519 getResponse().getHttpFields().add(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE);
520 break;
521 }
522 default:
523 {
524 throw new IllegalStateException();
525 }
526 }
527
528 if (!persistent)
529 _generator.setPersistent(false);
530
531 return super.headerComplete();
532 }
533
534 @Override
535 protected void handleException(Throwable x)
536 {
537 _generator.setPersistent(false);
538 super.handleException(x);
539 }
540
541 @Override
542 public void abort()
543 {
544 super.abort();
545 _generator.setPersistent(false);
546 }
547
548 @Override
549 public boolean messageComplete()
550 {
551 super.messageComplete();
552 return false;
553 }
554 }
555
556 private class SendCallback extends IteratingCallback
557 {
558 private ResponseInfo _info;
559 private ByteBuffer _content;
560 private boolean _lastContent;
561 private Callback _callback;
562 private ByteBuffer _header;
563 private boolean _shutdownOut;
564
565 private SendCallback()
566 {
567 super(true);
568 }
569
570 private boolean reset(ResponseInfo info, ByteBuffer content, boolean last, Callback callback)
571 {
572 if (reset())
573 {
574 _info = info;
575 _content = content;
576 _lastContent = last;
577 _callback = callback;
578 _header = null;
579 _shutdownOut = false;
580 return true;
581 }
582
583 if (isClosed())
584 callback.failed(new EofException());
585 else
586 callback.failed(new WritePendingException());
587 return false;
588 }
589
590 @Override
591 public Action process() throws Exception
592 {
593 if (_callback==null)
594 throw new IllegalStateException();
595
596 ByteBuffer chunk = _chunk;
597 while (true)
598 {
599 HttpGenerator.Result result = _generator.generateResponse(_info, _header, chunk, _content, _lastContent);
600 if (LOG.isDebugEnabled())
601 LOG.debug("{} generate: {} ({},{},{})@{}",
602 this,
603 result,
604 BufferUtil.toSummaryString(_header),
605 BufferUtil.toSummaryString(_content),
606 _lastContent,
607 _generator.getState());
608
609 switch (result)
610 {
611 case NEED_HEADER:
612 {
613 _header = _bufferPool.acquire(_config.getResponseHeaderSize(), HEADER_BUFFER_DIRECT);
614 continue;
615 }
616 case NEED_CHUNK:
617 {
618 chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT);
619 continue;
620 }
621 case FLUSH:
622 {
623
624 if (_channel.getRequest().isHead() || _generator.isNoContent())
625 {
626 BufferUtil.clear(chunk);
627 BufferUtil.clear(_content);
628 }
629
630
631 if (BufferUtil.hasContent(_header))
632 {
633 if (BufferUtil.hasContent(_content))
634 {
635 if (BufferUtil.hasContent(chunk))
636 getEndPoint().write(this, _header, chunk, _content);
637 else
638 getEndPoint().write(this, _header, _content);
639 }
640 else
641 getEndPoint().write(this, _header);
642 }
643 else if (BufferUtil.hasContent(chunk))
644 {
645 if (BufferUtil.hasContent(_content))
646 getEndPoint().write(this, chunk, _content);
647 else
648 getEndPoint().write(this, chunk);
649 }
650 else if (BufferUtil.hasContent(_content))
651 {
652 getEndPoint().write(this, _content);
653 }
654 else
655 {
656 succeeded();
657 }
658 return Action.SCHEDULED;
659 }
660 case SHUTDOWN_OUT:
661 {
662 _shutdownOut=true;
663 continue;
664 }
665 case DONE:
666 {
667 return Action.SUCCEEDED;
668 }
669 case CONTINUE:
670 {
671 break;
672 }
673 default:
674 {
675 throw new IllegalStateException("generateResponse="+result);
676 }
677 }
678 }
679 }
680
681 private void releaseHeader()
682 {
683 ByteBuffer h=_header;
684 _header=null;
685 if (h!=null)
686 _bufferPool.release(h);
687 }
688
689 @Override
690 protected void onCompleteSuccess()
691 {
692 releaseHeader();
693 _callback.succeeded();
694 if (_shutdownOut)
695 getEndPoint().shutdownOutput();
696 }
697
698 @Override
699 public void onCompleteFailure(final Throwable x)
700 {
701 releaseHeader();
702 failedCallback(_callback,x);
703 if (_shutdownOut)
704 getEndPoint().shutdownOutput();
705 }
706
707 @Override
708 public String toString()
709 {
710 return String.format("%s[i=%s,cb=%s]",super.toString(),_info,_callback);
711 }
712 }
713
714
715 @Override
716 public void abort()
717 {
718
719
720 getEndPoint().close();
721 }
722
723 }