1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.nio.ByteBuffer;
24 import java.util.concurrent.RejectedExecutionException;
25 import java.util.concurrent.TimeoutException;
26
27 import org.eclipse.jetty.http.HttpGenerator;
28 import org.eclipse.jetty.http.HttpGenerator.ResponseInfo;
29 import org.eclipse.jetty.http.HttpHeader;
30 import org.eclipse.jetty.http.HttpHeaderValue;
31 import org.eclipse.jetty.http.HttpMethod;
32 import org.eclipse.jetty.http.HttpParser;
33 import org.eclipse.jetty.http.HttpStatus;
34 import org.eclipse.jetty.http.HttpVersion;
35 import org.eclipse.jetty.io.AbstractConnection;
36 import org.eclipse.jetty.io.ByteBufferPool;
37 import org.eclipse.jetty.io.Connection;
38 import org.eclipse.jetty.io.EndPoint;
39 import org.eclipse.jetty.io.EofException;
40 import org.eclipse.jetty.util.BlockingCallback;
41 import org.eclipse.jetty.util.BufferUtil;
42 import org.eclipse.jetty.util.Callback;
43 import org.eclipse.jetty.util.log.Log;
44 import org.eclipse.jetty.util.log.Logger;
45
46
47
48
49 public class HttpConnection extends AbstractConnection implements Runnable, HttpTransport
50 {
51 public static final String UPGRADE_CONNECTION_ATTRIBUTE = "org.eclipse.jetty.server.HttpConnection.UPGRADE";
52 private static final boolean REQUEST_BUFFER_DIRECT=false;
53 private static final boolean HEADER_BUFFER_DIRECT=true;
54 private static final boolean CHUNK_BUFFER_DIRECT=false;
55 private static final Logger LOG = Log.getLogger(HttpConnection.class);
56 private static final ThreadLocal<HttpConnection> __currentConnection = new ThreadLocal<>();
57
58 private final HttpConfiguration _config;
59 private final Connector _connector;
60 private final ByteBufferPool _bufferPool;
61 private final HttpGenerator _generator;
62 private final HttpChannelOverHttp _channel;
63 private final HttpParser _parser;
64 private volatile ByteBuffer _requestBuffer = null;
65 private volatile ByteBuffer _chunk = null;
66 private BlockingCallback _readBlocker = new BlockingCallback();
67 private BlockingCallback _writeBlocker = new BlockingCallback();
68
69
70 public static HttpConnection getCurrentConnection()
71 {
72 return __currentConnection.get();
73 }
74
75 protected static void setCurrentConnection(HttpConnection connection)
76 {
77 __currentConnection.set(connection);
78 }
79
80 public HttpConfiguration getHttpConfiguration()
81 {
82 return _config;
83 }
84
85 public HttpConnection(HttpConfiguration config, Connector connector, EndPoint endPoint)
86 {
87
88
89 super(endPoint, connector.getExecutor(),true);
90
91 _config = config;
92 _connector = connector;
93 _bufferPool = _connector.getByteBufferPool();
94 _generator = new HttpGenerator();
95 _generator.setSendServerVersion(_config.getSendServerVersion());
96 _channel = new HttpChannelOverHttp(connector, config, endPoint, this, new Input());
97 _parser = newHttpParser();
98
99 LOG.debug("New HTTP Connection {}", this);
100 }
101
102 protected HttpParser newHttpParser()
103 {
104 return new HttpParser(newRequestHandler(), getHttpConfiguration().getRequestHeaderSize());
105 }
106
107 protected HttpParser.RequestHandler<ByteBuffer> newRequestHandler()
108 {
109 return _channel;
110 }
111
112 public Server getServer()
113 {
114 return _connector.getServer();
115 }
116
117 public Connector getConnector()
118 {
119 return _connector;
120 }
121
122 public HttpChannel<?> getHttpChannel()
123 {
124 return _channel;
125 }
126
127 public void reset()
128 {
129
130 if (_channel.isExpecting100Continue())
131 {
132
133 _parser.reset();
134
135 _parser.close();
136 }
137
138 else if (_generator.isPersistent())
139
140 _parser.reset();
141 else
142
143 _parser.close();
144
145 _generator.reset();
146 _channel.reset();
147
148 releaseRequestBuffer();
149 if (_chunk!=null)
150 {
151 _bufferPool.release(_chunk);
152 _chunk=null;
153 }
154 }
155
156
157 @Override
158 public int getMessagesIn()
159 {
160 return getHttpChannel().getRequests();
161 }
162
163 @Override
164 public int getMessagesOut()
165 {
166 return getHttpChannel().getRequests();
167 }
168
169 @Override
170 public String toString()
171 {
172 return String.format("%s,g=%s,p=%s",
173 super.toString(),
174 _generator,
175 _parser);
176 }
177
178 private void releaseRequestBuffer()
179 {
180 if (_requestBuffer != null && !_requestBuffer.hasRemaining())
181 {
182 ByteBuffer buffer=_requestBuffer;
183 _requestBuffer=null;
184 _bufferPool.release(buffer);
185 }
186 }
187
188
189
190
191
192
193
194
195
196 @Override
197 public void onFillable()
198 {
199 LOG.debug("{} onFillable {}", this, _channel.getState());
200
201 setCurrentConnection(this);
202 try
203 {
204 while (true)
205 {
206
207 boolean call_channel=_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer);
208
209
210 if (!call_channel && BufferUtil.isEmpty(_requestBuffer))
211 {
212 if (_requestBuffer == null)
213 _requestBuffer = _bufferPool.acquire(getInputBufferSize(), REQUEST_BUFFER_DIRECT);
214
215 int filled = getEndPoint().fill(_requestBuffer);
216 if (filled==0)
217 filled = getEndPoint().fill(_requestBuffer);
218
219 LOG.debug("{} filled {}", this, filled);
220
221
222 if (filled == 0)
223 {
224
225 releaseRequestBuffer();
226 fillInterested();
227 return;
228 }
229 else if (filled < 0)
230 {
231 _parser.shutdownInput();
232
233
234
235
236 if (getEndPoint().isOutputShutdown())
237 getEndPoint().close();
238 else
239 getEndPoint().shutdownOutput();
240
241 releaseRequestBuffer();
242 return;
243 }
244
245
246 call_channel=_parser.parseNext(_requestBuffer);
247 }
248
249
250 if (call_channel)
251 {
252
253
254
255 while (_parser.inContentState())
256 {
257 if (!_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer))
258 break;
259 }
260
261
262
263
264
265 _channel.run();
266
267
268 if (_channel.getState().isSuspended() || getEndPoint().getConnection()!=this)
269 return;
270 }
271 }
272 }
273 catch (EofException e)
274 {
275 LOG.debug(e);
276 }
277 catch (IOException e)
278 {
279 if (_parser.isIdle())
280 LOG.debug(e);
281 else
282 LOG.warn(this.toString(), e);
283 close();
284 }
285 catch (Exception e)
286 {
287 LOG.warn(this.toString(), e);
288 close();
289 }
290 finally
291 {
292 setCurrentConnection(null);
293 }
294 }
295
296 @Override
297 public void onOpen()
298 {
299 super.onOpen();
300 fillInterested();
301 }
302
303 @Override
304 public void run()
305 {
306 onFillable();
307 }
308
309
310 @Override
311 public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent) throws IOException
312 {
313
314 if (_channel.isExpecting100Continue())
315
316 _generator.setPersistent(false);
317
318
319 ByteBuffer header = null;
320 ByteBuffer chunk = null;
321 out: while (true)
322 {
323 HttpGenerator.Result result = _generator.generateResponse(info, header, chunk, content, lastContent);
324 if (LOG.isDebugEnabled())
325 LOG.debug("{} generate: {} ({},{},{})@{}",
326 this,
327 result,
328 BufferUtil.toSummaryString(header),
329 BufferUtil.toSummaryString(content),
330 lastContent,
331 _generator.getState());
332
333 switch (result)
334 {
335 case NEED_HEADER:
336 {
337 if (lastContent && content!=null && BufferUtil.space(content)>_config.getResponseHeaderSize() && content.hasArray() )
338 {
339
340 int p=content.position();
341 int l=content.limit();
342 content.position(l);
343 content.limit(l+_config.getResponseHeaderSize());
344 header=content.slice();
345 header.limit(0);
346 content.position(p);
347 content.limit(l);
348 }
349 else
350 header = _bufferPool.acquire(_config.getResponseHeaderSize(), HEADER_BUFFER_DIRECT);
351 continue;
352 }
353 case NEED_CHUNK:
354 {
355 chunk = _chunk;
356 if (chunk==null)
357 chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT);
358 continue;
359 }
360 case FLUSH:
361 {
362
363 if (_channel.getRequest().isHead())
364 {
365 BufferUtil.clear(chunk);
366 BufferUtil.clear(content);
367 }
368
369
370 if (BufferUtil.hasContent(header))
371 {
372
373 if (BufferUtil.hasContent(content))
374 blockingWrite(header, content);
375 else
376 blockingWrite(header);
377
378 }
379 else if (BufferUtil.hasContent(chunk))
380 {
381 if (BufferUtil.hasContent(content))
382 blockingWrite(chunk,content);
383 else
384 blockingWrite(chunk);
385 }
386 else if (BufferUtil.hasContent(content))
387 {
388 blockingWrite(content);
389 }
390 continue;
391 }
392 case SHUTDOWN_OUT:
393 {
394 getEndPoint().shutdownOutput();
395 continue;
396 }
397 case DONE:
398 {
399 if (header!=null)
400 {
401
402 if (!lastContent || content==null || !content.hasArray() || !header.hasArray() || content.array()!=header.array())
403 _bufferPool.release(header);
404 }
405 if (chunk!=null)
406 _bufferPool.release(chunk);
407 break out;
408 }
409 case CONTINUE:
410 {
411 break;
412 }
413 default:
414 {
415 throw new IllegalStateException("generateResponse="+result);
416 }
417 }
418 }
419 }
420
421 @Override
422 public void send(ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback)
423 {
424 try
425 {
426 send(info,content,lastContent);
427 callback.succeeded();
428 }
429 catch (IOException e)
430 {
431 callback.failed(e);
432 }
433 }
434
435 private void blockingWrite(ByteBuffer... bytes) throws IOException
436 {
437 try
438 {
439 getEndPoint().write(_writeBlocker, bytes);
440 _writeBlocker.block();
441 }
442 catch (InterruptedException x)
443 {
444 throw (IOException)new InterruptedIOException().initCause(x);
445 }
446 catch (TimeoutException e)
447 {
448 throw new IOException(e);
449 }
450 }
451
452 @Override
453 public void completed()
454 {
455
456 if (_parser.isInContent() && _generator.isPersistent() && !_channel.isExpecting100Continue())
457
458 _channel.getRequest().getHttpInput().consumeAll();
459
460
461 if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
462 {
463 Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE);
464 if (connection != null)
465 {
466 LOG.debug("Upgrade from {} to {}", this, connection);
467 onClose();
468 getEndPoint().setConnection(connection);
469 connection.onOpen();
470 reset();
471 return;
472 }
473 }
474
475 reset();
476
477
478 if (getCurrentConnection()==null)
479 {
480 if (_parser.isStart())
481 {
482
483 if (_requestBuffer == null)
484 {
485 fillInterested();
486 }
487 else if (getConnector().isStarted())
488 {
489 LOG.debug("{} pipelined", this);
490
491 try
492 {
493 getExecutor().execute(this);
494 }
495 catch (RejectedExecutionException e)
496 {
497 if (getConnector().isStarted())
498 LOG.warn(e);
499 else
500 LOG.ignore(e);
501 getEndPoint().close();
502 }
503 }
504 else
505 {
506 getEndPoint().close();
507 }
508 }
509
510
511
512 if (getEndPoint().isOpen() && getEndPoint().isOutputShutdown())
513 {
514 fillInterested();
515 }
516 }
517 }
518
519 public ByteBuffer getRequestBuffer()
520 {
521 return _requestBuffer;
522 }
523
524 private class Input extends ByteBufferHttpInput
525 {
526 @Override
527 protected void blockForContent() throws IOException
528 {
529
530
531
532
533
534 try
535 {
536 while (true)
537 {
538
539 boolean event=_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer);
540
541
542
543 while (event && BufferUtil.hasContent(_requestBuffer) && _parser.inContentState())
544 _parser.parseNext(_requestBuffer);
545
546
547 if (event)
548 return;
549
550
551 if (BufferUtil.isEmpty(_requestBuffer))
552 {
553
554 if (getEndPoint().isInputShutdown())
555 {
556 _parser.shutdownInput();
557 return;
558 }
559
560
561 getEndPoint().fillInterested(_readBlocker);
562 LOG.debug("{} block readable on {}",this,_readBlocker);
563 _readBlocker.block();
564
565
566 if (_requestBuffer==null)
567 {
568 long content_length=_channel.getRequest().getContentLength();
569 int size=getInputBufferSize();
570 if (size<content_length)
571 size=size*4;
572 _requestBuffer=_bufferPool.acquire(size,REQUEST_BUFFER_DIRECT);
573 }
574
575
576 int filled=getEndPoint().fill(_requestBuffer);
577 LOG.debug("{} block filled {}",this,filled);
578 if (filled<0)
579 {
580 _parser.shutdownInput();
581 return;
582 }
583 }
584 }
585 }
586 catch (TimeoutException e)
587 {
588 throw new EofException(e);
589 }
590 catch (final InterruptedException x)
591 {
592 throw new InterruptedIOException(getEndPoint().toString()){{initCause(x);}};
593 }
594 }
595
596 @Override
597 protected void onContentQueued(ByteBuffer ref)
598 {
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614 }
615
616
617 @Override
618 protected void onAllContentConsumed()
619 {
620
621
622
623
624 releaseRequestBuffer();
625 }
626 }
627
628 private class HttpChannelOverHttp extends HttpChannel<ByteBuffer>
629 {
630 public HttpChannelOverHttp(Connector connector, HttpConfiguration config, EndPoint endPoint, HttpTransport transport, HttpInput<ByteBuffer> input)
631 {
632 super(connector,config,endPoint,transport,input);
633 }
634
635 @Override
636 public void badMessage(int status, String reason)
637 {
638 _generator.setPersistent(false);
639 super.badMessage(status,reason);
640 }
641
642 @Override
643 public boolean headerComplete()
644 {
645 boolean persistent;
646 HttpVersion version = getHttpVersion();
647
648 switch (version)
649 {
650 case HTTP_0_9:
651 {
652 persistent = false;
653 break;
654 }
655 case HTTP_1_0:
656 {
657 persistent = getRequest().getHttpFields().contains(HttpHeader.CONNECTION, HttpHeaderValue.KEEP_ALIVE.asString());
658 if (!persistent)
659 persistent = HttpMethod.CONNECT.is(getRequest().getMethod());
660 if (persistent)
661 getResponse().getHttpFields().add(HttpHeader.CONNECTION, HttpHeaderValue.KEEP_ALIVE);
662 break;
663 }
664 case HTTP_1_1:
665 {
666 persistent = !getRequest().getHttpFields().contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString());
667 if (!persistent)
668 persistent = HttpMethod.CONNECT.is(getRequest().getMethod());
669 if (!persistent)
670 getResponse().getHttpFields().add(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE);
671 break;
672 }
673 default:
674 {
675 throw new IllegalStateException();
676 }
677 }
678
679 if (!persistent)
680 _generator.setPersistent(false);
681
682 return super.headerComplete();
683 }
684
685 @Override
686 protected void handleException(Throwable x)
687 {
688 _generator.setPersistent(false);
689 super.handleException(x);
690 }
691
692 }
693
694
695 }