View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.server;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.nio.ByteBuffer;
24  import java.util.concurrent.RejectedExecutionException;
25  import java.util.concurrent.TimeoutException;
26  
27  import org.eclipse.jetty.http.HttpGenerator;
28  import org.eclipse.jetty.http.HttpGenerator.ResponseInfo;
29  import org.eclipse.jetty.http.HttpHeader;
30  import org.eclipse.jetty.http.HttpHeaderValue;
31  import org.eclipse.jetty.http.HttpMethod;
32  import org.eclipse.jetty.http.HttpParser;
33  import org.eclipse.jetty.http.HttpStatus;
34  import org.eclipse.jetty.http.HttpVersion;
35  import org.eclipse.jetty.io.AbstractConnection;
36  import org.eclipse.jetty.io.ByteBufferPool;
37  import org.eclipse.jetty.io.Connection;
38  import org.eclipse.jetty.io.EndPoint;
39  import org.eclipse.jetty.io.EofException;
40  import org.eclipse.jetty.util.BlockingCallback;
41  import org.eclipse.jetty.util.BufferUtil;
42  import org.eclipse.jetty.util.Callback;
43  import org.eclipse.jetty.util.log.Log;
44  import org.eclipse.jetty.util.log.Logger;
45  
46  /**
47   * <p>A {@link Connection} that handles the HTTP protocol.</p>
48   */
49  public class HttpConnection extends AbstractConnection implements Runnable, HttpTransport
50  {
51      public static final String UPGRADE_CONNECTION_ATTRIBUTE = "org.eclipse.jetty.server.HttpConnection.UPGRADE";
52      private static final boolean REQUEST_BUFFER_DIRECT=false;
53      private static final boolean HEADER_BUFFER_DIRECT=true;
54      private static final boolean CHUNK_BUFFER_DIRECT=false;
55      private static final Logger LOG = Log.getLogger(HttpConnection.class);
56      private static final ThreadLocal<HttpConnection> __currentConnection = new ThreadLocal<>();
57  
58      private final HttpConfiguration _config;
59      private final Connector _connector;
60      private final ByteBufferPool _bufferPool;
61      private final HttpGenerator _generator;
62      private final HttpChannelOverHttp _channel;
63      private final HttpParser _parser;
64      private volatile ByteBuffer _requestBuffer = null;
65      private volatile ByteBuffer _chunk = null;
66      private BlockingCallback _readBlocker = new BlockingCallback();
67      private BlockingCallback _writeBlocker = new BlockingCallback();
68  
69  
70      public static HttpConnection getCurrentConnection()
71      {
72          return __currentConnection.get();
73      }
74  
75      protected static void setCurrentConnection(HttpConnection connection)
76      {
77          __currentConnection.set(connection);
78      }
79  
80      public HttpConfiguration getHttpConfiguration()
81      {
82          return _config;
83      }
84  
85      public HttpConnection(HttpConfiguration config, Connector connector, EndPoint endPoint)
86      {
87          // Tell AbstractConnector executeOnFillable==true because we want the same thread that
88          // does the HTTP parsing to handle the request so its cache is hot
89          super(endPoint, connector.getExecutor(),true);
90  
91          _config = config;
92          _connector = connector;
93          _bufferPool = _connector.getByteBufferPool();
94          _generator = new HttpGenerator();
95          _generator.setSendServerVersion(_config.getSendServerVersion());
96          _channel = new HttpChannelOverHttp(connector, config, endPoint, this, new Input());
97          _parser = newHttpParser();
98  
99          LOG.debug("New HTTP Connection {}", this);
100     }
101 
102     protected HttpParser newHttpParser()
103     {
104         return new HttpParser(newRequestHandler(), getHttpConfiguration().getRequestHeaderSize());
105     }
106 
107     protected HttpParser.RequestHandler<ByteBuffer> newRequestHandler()
108     {
109         return _channel;
110     }
111 
112     public Server getServer()
113     {
114         return _connector.getServer();
115     }
116 
117     public Connector getConnector()
118     {
119         return _connector;
120     }
121 
122     public HttpChannel<?> getHttpChannel()
123     {
124         return _channel;
125     }
126 
127     public void reset()
128     {
129         // If we are still expecting
130         if (_channel.isExpecting100Continue())
131         {
132             // reset to avoid seeking remaining content
133             _parser.reset();
134             // close to seek EOF
135             _parser.close();
136         }
137         // else if we are persistent
138         else if (_generator.isPersistent())
139             // reset to seek next request
140             _parser.reset();
141         else
142             // else seek EOF
143             _parser.close();
144 
145         _generator.reset();
146         _channel.reset();
147 
148         releaseRequestBuffer();
149         if (_chunk!=null)
150         {
151             _bufferPool.release(_chunk);
152             _chunk=null;
153         }
154     }
155 
156 
157     @Override
158     public int getMessagesIn()
159     {
160         return getHttpChannel().getRequests();
161     }
162 
163     @Override
164     public int getMessagesOut()
165     {
166         return getHttpChannel().getRequests();
167     }
168 
169     @Override
170     public String toString()
171     {
172         return String.format("%s,g=%s,p=%s",
173                 super.toString(),
174                 _generator,
175                 _parser);
176     }
177 
178     private void releaseRequestBuffer()
179     {
180         if (_requestBuffer != null && !_requestBuffer.hasRemaining())
181         {
182             ByteBuffer buffer=_requestBuffer;
183             _requestBuffer=null;
184             _bufferPool.release(buffer);
185         }
186     }
187 
188     /**
189      * <p>Parses and handles HTTP messages.</p>
190      * <p>This method is called when this {@link Connection} is ready to read bytes from the {@link EndPoint}.
191      * However, it can also be called if there is unconsumed data in the _requestBuffer, as a result of
192      * resuming a suspended request when there is a pipelined request already read into the buffer.</p>
193      * <p>This method fills bytes and parses them until either: EOF is filled; 0 bytes are filled;
194      * the HttpChannel finishes handling; or the connection has changed.</p>
195      */
196     @Override
197     public void onFillable()
198     {
199         LOG.debug("{} onFillable {}", this, _channel.getState());
200 
201         setCurrentConnection(this);
202         try
203         {
204             while (true)
205             {
206                 // Can the parser progress (even with an empty buffer)
207                 boolean call_channel=_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer);
208 
209                 // If there is a request buffer, we are re-entering here
210                 if (!call_channel && BufferUtil.isEmpty(_requestBuffer))
211                 {
212                     if (_requestBuffer == null)
213                         _requestBuffer = _bufferPool.acquire(getInputBufferSize(), REQUEST_BUFFER_DIRECT);
214 
215                     int filled = getEndPoint().fill(_requestBuffer);
216                     if (filled==0) // Do a retry on fill 0 (optimisation for SSL connections)
217                         filled = getEndPoint().fill(_requestBuffer);
218 
219                     LOG.debug("{} filled {}", this, filled);
220 
221                     // If we failed to fill
222                     if (filled == 0)
223                     {
224                         // Somebody wanted to read, we didn't so schedule another attempt
225                         releaseRequestBuffer();
226                         fillInterested();
227                         return;
228                     }
229                     else if (filled < 0)
230                     {
231                         _parser.shutdownInput();
232                         // We were only filling if fully consumed, so if we have
233                         // read -1 then we have nothing to parse and thus nothing that
234                         // will generate a response.  If we had a suspended request pending
235                         // a response or a request waiting in the buffer, we would not be here.
236                         if (getEndPoint().isOutputShutdown())
237                             getEndPoint().close();
238                         else
239                             getEndPoint().shutdownOutput();
240                         // buffer must be empty and the channel must be idle, so we can release.
241                         releaseRequestBuffer();
242                         return;
243                     }
244 
245                     // Parse what we have read
246                     call_channel=_parser.parseNext(_requestBuffer);
247                 }
248 
249                 // Parse the buffer
250                 if (call_channel)
251                 {
252                     // Parse as much content as there is available before calling the channel
253                     // this is both efficient (may queue many chunks), will correctly set available for 100 continues
254                     // and will drive the parser to completion if all content is available.
255                     while (_parser.inContentState())
256                     {
257                         if (!_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer))
258                             break;
259                     }
260 
261                     // The parser returned true, which indicates the channel is ready to handle a request.
262                     // Call the channel and this will either handle the request/response to completion OR,
263                     // if the request suspends, the request/response will be incomplete so the outer loop will exit.
264                         
265                     _channel.run();
266                     
267                     // Return if suspended or upgraded
268                     if (_channel.getState().isSuspended() || getEndPoint().getConnection()!=this)
269                         return;
270                 }
271             }
272         }
273         catch (EofException e)
274         {
275             LOG.debug(e);
276         }
277         catch (IOException e)
278         {
279             if (_parser.isIdle())
280                 LOG.debug(e);
281             else
282                 LOG.warn(this.toString(), e);
283             close();
284         }
285         catch (Exception e)
286         {
287             LOG.warn(this.toString(), e);
288             close();
289         }
290         finally
291         {
292             setCurrentConnection(null);
293         }
294     }
295 
296     @Override
297     public void onOpen()
298     {
299         super.onOpen();
300         fillInterested();
301     }
302 
303     @Override
304     public void run()
305     {
306         onFillable();
307     }
308 
309 
310     @Override
311     public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent) throws IOException
312     {
313         // If we are still expecting a 100 continues
314         if (_channel.isExpecting100Continue())
315             // then we can't be persistent
316             _generator.setPersistent(false);
317 
318 
319         ByteBuffer header = null;
320         ByteBuffer chunk = null;
321         out: while (true)
322         {
323             HttpGenerator.Result result = _generator.generateResponse(info, header, chunk, content, lastContent);
324             if (LOG.isDebugEnabled())
325                 LOG.debug("{} generate: {} ({},{},{})@{}",
326                         this,
327                         result,
328                         BufferUtil.toSummaryString(header),
329                         BufferUtil.toSummaryString(content),
330                         lastContent,
331                         _generator.getState());
332 
333             switch (result)
334             {
335                 case NEED_HEADER:
336                 {
337                     if (lastContent && content!=null && BufferUtil.space(content)>_config.getResponseHeaderSize() && content.hasArray() )
338                     {
339                         // use spare space in content buffer for header buffer
340                         int p=content.position();
341                         int l=content.limit();
342                         content.position(l);
343                         content.limit(l+_config.getResponseHeaderSize());
344                         header=content.slice();
345                         header.limit(0);
346                         content.position(p);
347                         content.limit(l);
348                     }
349                     else
350                         header = _bufferPool.acquire(_config.getResponseHeaderSize(), HEADER_BUFFER_DIRECT);
351                     continue;
352                 }
353                 case NEED_CHUNK:
354                 {
355                     chunk = _chunk;
356                     if (chunk==null)
357                         chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT);
358                     continue;
359                 }
360                 case FLUSH:
361                 {
362                     // Don't write the chunk or the content if this is a HEAD response
363                     if (_channel.getRequest().isHead())
364                     {
365                         BufferUtil.clear(chunk);
366                         BufferUtil.clear(content);
367                     }
368 
369                     // If we have a header
370                     if (BufferUtil.hasContent(header))
371                     {
372                         // we know there will not be a chunk, so write either header+content or just the header
373                         if (BufferUtil.hasContent(content))
374                             blockingWrite(header, content);
375                         else
376                             blockingWrite(header);
377 
378                     }
379                     else if (BufferUtil.hasContent(chunk))
380                     {
381                         if (BufferUtil.hasContent(content))
382                             blockingWrite(chunk,content);
383                         else
384                             blockingWrite(chunk);
385                     }
386                     else if (BufferUtil.hasContent(content))
387                     {
388                         blockingWrite(content);
389                     }
390                     continue;
391                 }
392                 case SHUTDOWN_OUT:
393                 {
394                     getEndPoint().shutdownOutput();
395                     continue;
396                 }
397                 case DONE:
398                 {
399                     if (header!=null)
400                     {
401                         // don't release header in spare content buffer
402                         if (!lastContent || content==null || !content.hasArray() || !header.hasArray() ||  content.array()!=header.array())
403                             _bufferPool.release(header);
404                     }
405                     if (chunk!=null)
406                         _bufferPool.release(chunk);
407                     break out;
408                 }
409                 case CONTINUE:
410                 {
411                     break;
412                 }
413                 default:
414                 {
415                     throw new IllegalStateException("generateResponse="+result);
416                 }
417             }
418         }
419     }
420 
421     @Override
422     public void send(ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback)
423     {
424         try
425         {
426             send(info,content,lastContent);
427             callback.succeeded();
428         }
429         catch (IOException e)
430         {
431             callback.failed(e);
432         }
433     }
434 
435     private void blockingWrite(ByteBuffer... bytes) throws IOException
436     {
437         try
438         {
439             getEndPoint().write(_writeBlocker, bytes);
440             _writeBlocker.block();
441         }
442         catch (InterruptedException x)
443         {
444             throw (IOException)new InterruptedIOException().initCause(x);
445         }
446         catch (TimeoutException e)
447         {
448             throw new IOException(e);
449         }
450     }
451 
452     @Override
453     public void completed()
454     {
455         // Finish consuming the request
456         if (_parser.isInContent() && _generator.isPersistent() && !_channel.isExpecting100Continue())
457             // Complete reading the request
458             _channel.getRequest().getHttpInput().consumeAll();
459 
460         // Handle connection upgrades
461         if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
462         {
463             Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE);
464             if (connection != null)
465             {
466                 LOG.debug("Upgrade from {} to {}", this, connection);
467                 onClose();
468                 getEndPoint().setConnection(connection);
469                 connection.onOpen();
470                 reset();
471                 return;
472             }
473         }
474 
475         reset();
476 
477         // if we are not called from the onfillable thread, schedule completion
478         if (getCurrentConnection()==null)
479         {
480             if (_parser.isStart())
481             {
482                 // it wants to eat more
483                 if (_requestBuffer == null)
484                 {
485                     fillInterested();
486                 }
487                 else if (getConnector().isStarted())
488                 {
489                     LOG.debug("{} pipelined", this);
490 
491                     try
492                     {
493                         getExecutor().execute(this);
494                     }
495                     catch (RejectedExecutionException e)
496                     {
497                         if (getConnector().isStarted())
498                             LOG.warn(e);
499                         else
500                             LOG.ignore(e);
501                         getEndPoint().close();
502                     }
503                 }
504                 else
505                 {
506                     getEndPoint().close();
507                 }
508             }
509 
510             // make sure that an oshut connection is driven towards close
511             // TODO this is a little ugly
512             if (getEndPoint().isOpen() && getEndPoint().isOutputShutdown())
513             {
514                 fillInterested();
515             }
516         }
517     }
518 
519     public ByteBuffer getRequestBuffer()
520     {
521         return _requestBuffer;
522     }
523 
524     private class Input extends ByteBufferHttpInput
525     {
526         @Override
527         protected void blockForContent() throws IOException
528         {
529             /* We extend the blockForContent method to replace the
530             default implementation of a blocking queue with an implementation
531             that uses the calling thread to block on a readable callback and
532             then to do the parsing before before attempting the read.
533             */
534             try
535             {
536                 while (true)
537                 {
538                     // Can the parser progress (even with an empty buffer)
539                     boolean event=_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer);
540 
541                     // If there is more content to parse, loop so we can queue all content from this buffer now without the
542                     // need to call blockForContent again
543                     while (event && BufferUtil.hasContent(_requestBuffer) && _parser.inContentState())
544                         _parser.parseNext(_requestBuffer);
545 
546                     // If we have an event, return
547                     if (event)
548                         return;
549 
550                     // Do we have content ready to parse?
551                     if (BufferUtil.isEmpty(_requestBuffer))
552                     {
553                         // If no more input
554                         if (getEndPoint().isInputShutdown())
555                         {
556                             _parser.shutdownInput();
557                             return;
558                         }
559 
560                         // Wait until we can read
561                         getEndPoint().fillInterested(_readBlocker);
562                         LOG.debug("{} block readable on {}",this,_readBlocker);
563                         _readBlocker.block();
564 
565                         // We will need a buffer to read into
566                         if (_requestBuffer==null)
567                         {
568                             long content_length=_channel.getRequest().getContentLength();
569                             int size=getInputBufferSize();
570                             if (size<content_length)
571                                 size=size*4; // TODO tune this
572                             _requestBuffer=_bufferPool.acquire(size,REQUEST_BUFFER_DIRECT);
573                         }
574 
575                         // read some data
576                         int filled=getEndPoint().fill(_requestBuffer);
577                         LOG.debug("{} block filled {}",this,filled);
578                         if (filled<0)
579                         {
580                             _parser.shutdownInput();
581                             return;
582                         }
583                     }
584                 }
585             }
586             catch (TimeoutException e)
587             {
588                 throw new EofException(e);
589             }
590             catch (final InterruptedException x)
591             {
592                 throw new InterruptedIOException(getEndPoint().toString()){{initCause(x);}};
593             }
594         }
595 
596         @Override
597         protected void onContentQueued(ByteBuffer ref)
598         {
599             /* This callback could be used to tell the connection
600              * that the request did contain content and thus the request
601              * buffer needs to be held until a call to #onAllContentConsumed
602              *
603              * However it turns out that nothing is needed here because either a
604              * request will have content, in which case the request buffer will be
605              * released by a call to onAllContentConsumed; or it will not have content.
606              * If it does not have content, either it will complete quickly and the
607              * buffers will be released in completed() or it will be suspended and
608              * onReadable() contains explicit handling to release if it is suspended.
609              *
610              * We extend this method anyway, to turn off the notify done by the
611              * default implementation as this is not needed by our implementation
612              * of blockForContent
613              */
614         }
615 
616 
617         @Override
618         protected void onAllContentConsumed()
619         {
620             /* This callback tells the connection that all content that has
621              * been parsed has been consumed. Thus the request buffer may be
622              * released if it is empty.
623              */
624             releaseRequestBuffer();
625         }
626     }
627 
628     private class HttpChannelOverHttp extends HttpChannel<ByteBuffer>
629     {
630         public HttpChannelOverHttp(Connector connector, HttpConfiguration config, EndPoint endPoint, HttpTransport transport, HttpInput<ByteBuffer> input)
631         {
632             super(connector,config,endPoint,transport,input);
633         }
634 
635         @Override
636         public void badMessage(int status, String reason)
637         {
638             _generator.setPersistent(false);
639             super.badMessage(status,reason);
640         }
641 
642         @Override
643         public boolean headerComplete()
644         {
645             boolean persistent;
646             HttpVersion version = getHttpVersion();
647 
648             switch (version)
649             {
650                 case HTTP_0_9:
651                 {
652                     persistent = false;
653                     break;
654                 }
655                 case HTTP_1_0:
656                 {
657                     persistent = getRequest().getHttpFields().contains(HttpHeader.CONNECTION, HttpHeaderValue.KEEP_ALIVE.asString());
658                     if (!persistent)
659                         persistent = HttpMethod.CONNECT.is(getRequest().getMethod());
660                     if (persistent)
661                         getResponse().getHttpFields().add(HttpHeader.CONNECTION, HttpHeaderValue.KEEP_ALIVE);
662                     break;
663                 }
664                 case HTTP_1_1:
665                 {
666                     persistent = !getRequest().getHttpFields().contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString());
667                     if (!persistent)
668                         persistent = HttpMethod.CONNECT.is(getRequest().getMethod());
669                     if (!persistent)
670                         getResponse().getHttpFields().add(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE);
671                     break;
672                 }
673                 default:
674                 {
675                     throw new IllegalStateException();
676                 }
677             }
678 
679             if (!persistent)
680                 _generator.setPersistent(false);
681 
682             return super.headerComplete();
683         }
684 
685         @Override
686         protected void handleException(Throwable x)
687         {
688             _generator.setPersistent(false);
689             super.handleException(x);
690         }
691 
692     }
693 
694 
695 }