View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.server;
20  
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  import java.nio.channels.WritePendingException;
24  import java.util.concurrent.RejectedExecutionException;
25  
26  import org.eclipse.jetty.http.HttpGenerator;
27  import org.eclipse.jetty.http.HttpGenerator.ResponseInfo;
28  import org.eclipse.jetty.http.HttpHeader;
29  import org.eclipse.jetty.http.HttpHeaderValue;
30  import org.eclipse.jetty.http.HttpMethod;
31  import org.eclipse.jetty.http.HttpParser;
32  import org.eclipse.jetty.http.HttpStatus;
33  import org.eclipse.jetty.http.HttpVersion;
34  import org.eclipse.jetty.io.AbstractConnection;
35  import org.eclipse.jetty.io.ByteBufferPool;
36  import org.eclipse.jetty.io.Connection;
37  import org.eclipse.jetty.io.EndPoint;
38  import org.eclipse.jetty.io.EofException;
39  import org.eclipse.jetty.util.BufferUtil;
40  import org.eclipse.jetty.util.Callback;
41  import org.eclipse.jetty.util.IteratingCallback;
42  import org.eclipse.jetty.util.log.Log;
43  import org.eclipse.jetty.util.log.Logger;
44  
45  /**
46   * <p>A {@link Connection} that handles the HTTP protocol.</p>
47   */
48  public class HttpConnection extends AbstractConnection implements Runnable, HttpTransport
49  {
50      public static final String UPGRADE_CONNECTION_ATTRIBUTE = "org.eclipse.jetty.server.HttpConnection.UPGRADE";
51      private static final boolean REQUEST_BUFFER_DIRECT=false;
52      private static final boolean HEADER_BUFFER_DIRECT=false;
53      private static final boolean CHUNK_BUFFER_DIRECT=false;
54      private static final Logger LOG = Log.getLogger(HttpConnection.class);
55      private static final ThreadLocal<HttpConnection> __currentConnection = new ThreadLocal<>();
56  
57      private final HttpConfiguration _config;
58      private final Connector _connector;
59      private final ByteBufferPool _bufferPool;
60      private final HttpGenerator _generator;
61      private final HttpChannelOverHttp _channel;
62      private final HttpParser _parser;
63      private volatile ByteBuffer _requestBuffer = null;
64      private volatile ByteBuffer _chunk = null;
65      private final SendCallback _sendCallback = new SendCallback();
66  
67  
68      /* ------------------------------------------------------------ */
69      /** Get the current connection that this thread is dispatched to.
70       * Note that a thread may be processing a request asynchronously and 
71       * thus not be dispatched to the connection.  
72       * @see Request#getAttribute(String) for a more general way to access the HttpConnection
73       * @return the current HttpConnection or null
74       */
75      public static HttpConnection getCurrentConnection()
76      {
77          return __currentConnection.get();
78      }
79  
80      protected static HttpConnection setCurrentConnection(HttpConnection connection)
81      {
82          HttpConnection last=__currentConnection.get();
83          if (connection==null)
84              __currentConnection.remove();
85          else 
86              __currentConnection.set(connection);
87          return last;
88      }
89  
90      public HttpConfiguration getHttpConfiguration()
91      {
92          return _config;
93      }
94  
95      public HttpConnection(HttpConfiguration config, Connector connector, EndPoint endPoint)
96      {
97          // Tell AbstractConnector executeOnFillable==true because we want the same thread that
98          // does the HTTP parsing to handle the request so its cache is hot
99          super(endPoint, connector.getExecutor(),true);
100 
101         _config = config;
102         _connector = connector;
103         _bufferPool = _connector.getByteBufferPool();
104         _generator = newHttpGenerator();
105         HttpInput<ByteBuffer> input = newHttpInput();
106         _channel = newHttpChannel(input);
107         _parser = newHttpParser();
108         if (LOG.isDebugEnabled())
109             LOG.debug("New HTTP Connection {}", this);
110     }
111 
112     protected HttpGenerator newHttpGenerator()
113     {
114         return new HttpGenerator(_config.getSendServerVersion(),_config.getSendXPoweredBy());
115     }
116     
117     protected HttpInput<ByteBuffer> newHttpInput()
118     {
119         return new HttpInputOverHTTP(this);
120     }
121     
122     protected HttpChannelOverHttp newHttpChannel(HttpInput<ByteBuffer> httpInput)
123     {
124         return new HttpChannelOverHttp(_connector, _config, getEndPoint(), this, httpInput);
125     }
126     
127     protected HttpParser newHttpParser()
128     {
129         return new HttpParser(newRequestHandler(), getHttpConfiguration().getRequestHeaderSize());
130     }
131 
132     protected HttpParser.RequestHandler<ByteBuffer> newRequestHandler()
133     {
134         return _channel;
135     }
136 
137     public Server getServer()
138     {
139         return _connector.getServer();
140     }
141 
142     public Connector getConnector()
143     {
144         return _connector;
145     }
146 
147     public HttpChannel<?> getHttpChannel()
148     {
149         return _channel;
150     }
151 
152     public HttpParser getParser()
153     {
154         return _parser;
155     }
156 
157     @Override
158     public int getMessagesIn()
159     {
160         return getHttpChannel().getRequests();
161     }
162 
163     @Override
164     public int getMessagesOut()
165     {
166         return getHttpChannel().getRequests();
167     }
168 
169     void releaseRequestBuffer()
170     {
171         if (_requestBuffer != null && !_requestBuffer.hasRemaining())
172         {
173             ByteBuffer buffer=_requestBuffer;
174             _requestBuffer=null;
175             _bufferPool.release(buffer);
176         }
177     }
178     
179     public ByteBuffer getRequestBuffer()
180     {
181         if (_requestBuffer == null)
182             _requestBuffer = _bufferPool.acquire(getInputBufferSize(), REQUEST_BUFFER_DIRECT);
183         return _requestBuffer;
184     }
185 
186     /**
187      * <p>Parses and handles HTTP messages.</p>
188      * <p>This method is called when this {@link Connection} is ready to read bytes from the {@link EndPoint}.
189      * However, it can also be called if there is unconsumed data in the _requestBuffer, as a result of
190      * resuming a suspended request when there is a pipelined request already read into the buffer.</p>
191      * <p>This method fills bytes and parses them until either: EOF is filled; 0 bytes are filled;
192      * the HttpChannel finishes handling; or the connection has changed.</p>
193      */
194     @Override
195     public void onFillable()
196     {
197         if (LOG.isDebugEnabled())
198             LOG.debug("{} onFillable {}", this, _channel.getState());
199 
200         final HttpConnection last=setCurrentConnection(this);
201         int filled=Integer.MAX_VALUE;
202         boolean suspended=false;
203         try
204         {
205             // while not suspended and not upgraded
206             while (!suspended && getEndPoint().getConnection()==this)
207             {
208                 // Do we need some data to parse
209                 if (BufferUtil.isEmpty(_requestBuffer))
210                 {
211                     // If the previous iteration filled 0 bytes or saw a close, then break here 
212                     if (filled<=0)
213                         break;
214                         
215                     // Can we fill?
216                     if(getEndPoint().isInputShutdown())
217                     {
218                         // No pretend we read -1
219                         filled=-1;
220                         _parser.atEOF();
221                     }
222                     else
223                     {
224                         // Get a buffer
225                         // We are not in a race here for the request buffer as we have not yet received a request,
226                         // so there are not an possible legal threads calling #parseContent or #completed.
227                         _requestBuffer = getRequestBuffer();
228 
229                         // fill
230                         filled = getEndPoint().fill(_requestBuffer);
231                         if (filled==0) // Do a retry on fill 0 (optimization for SSL connections)
232                             filled = getEndPoint().fill(_requestBuffer);
233                         
234                         // tell parser
235                         if (filled < 0)
236                             _parser.atEOF();
237                     }
238                 }
239                 
240                 // Parse the buffer
241                 if (_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer))
242                 {
243                     // The parser returned true, which indicates the channel is ready to handle a request.
244                     // Call the channel and this will either handle the request/response to completion OR,
245                     // if the request suspends, the request/response will be incomplete so the outer loop will exit.
246                     // Not that onFillable no longer manipulates the request buffer from this point and that is
247                     // left to threads calling #completed or #parseContent (which may be this thread inside handle())
248                     suspended = !_channel.handle();
249                 }
250                 else
251                 {
252                     // We parsed what we could, recycle the request buffer
253                     // We are not in a race here for the request buffer as we have not yet received a request,
254                     // so there are not an possible legal threads calling #parseContent or #completed.
255                     releaseRequestBuffer();
256                 }
257             }
258         }
259         catch (EofException e)
260         {
261             LOG.debug(e);
262         }
263         catch (Exception e)
264         {
265             if (_parser.isIdle())
266                 LOG.debug(e);
267             else
268                 LOG.warn(this.toString(), e);
269             close();
270         }
271         finally
272         {                        
273             setCurrentConnection(last);
274             if (!suspended && getEndPoint().isOpen() && getEndPoint().getConnection()==this)
275             {
276                 fillInterested();
277             }
278         }
279     }
280 
281     /* ------------------------------------------------------------ */
282     /** Fill and parse data looking for content
283      * @throws IOException
284      */
285     protected void parseContent() throws IOException
286     {
287         // Not in a race here for the request buffer with #onFillable because an async consumer of
288         // content would only be started after onFillable has given up control.
289         // In a little bit of a race with #completed, but then not sure if it is legal to be doing 
290         // async calls to IO and have a completed call at the same time.
291         ByteBuffer requestBuffer = getRequestBuffer();
292 
293         while (_parser.inContentState())
294         {
295             // Can the parser progress (even with an empty buffer)
296             boolean parsed = _parser.parseNext(requestBuffer==null?BufferUtil.EMPTY_BUFFER:requestBuffer);
297 
298             // No, we can we try reading some content?
299             if (BufferUtil.isEmpty(requestBuffer) && getEndPoint().isInputShutdown())
300             {
301                 _parser.atEOF();
302                 if (parsed)
303                     break;
304                 continue;
305             }
306 
307             if (parsed)
308                 break;
309             
310             // OK lets read some data
311             int filled=getEndPoint().fill(requestBuffer);
312             if (LOG.isDebugEnabled()) // Avoid boxing of variable 'filled'
313                 LOG.debug("{} filled {}",this,filled);
314             if (filled<=0)
315             {
316                 if (filled<0)
317                 {
318                     _parser.atEOF();
319                     continue;
320                 }
321                 break;
322             }
323         }
324     }
325     
326     @Override
327     public void completed()
328     {
329         // Handle connection upgrades
330         if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
331         {
332             Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE);
333             if (connection != null)
334             {
335                 if (LOG.isDebugEnabled())
336                     LOG.debug("Upgrade from {} to {}", this, connection);
337                 onClose();
338                 getEndPoint().setConnection(connection);
339                 connection.onOpen();
340                 _channel.reset();
341                 _parser.reset();
342                 _generator.reset();
343                 releaseRequestBuffer();
344                 return;
345             }
346         }
347         
348         // Finish consuming the request
349         // If we are still expecting
350         if (_channel.isExpecting100Continue())
351         {
352             // close to seek EOF
353             _parser.close();
354         }
355         else if (_parser.inContentState() && _generator.isPersistent())
356         {
357             // If we are async, then we have problems to complete neatly
358             if (_channel.getRequest().getHttpInput().isAsync())
359             {
360                 if (LOG.isDebugEnabled())
361                     LOG.debug("unconsumed async input {}", this);
362                 _channel.abort();
363             }
364             else
365             {
366                 if (LOG.isDebugEnabled())
367                     LOG.debug("unconsumed input {}", this);
368                 // Complete reading the request
369                 if (!_channel.getRequest().getHttpInput().consumeAll())
370                     _channel.abort();
371             }
372         }
373 
374         // Reset the channel, parsers and generator
375         _channel.reset();
376         if (_generator.isPersistent() && !_parser.isClosed())
377             _parser.reset();
378         else
379             _parser.close();
380         
381         // Not in a race here with onFillable, because it has given up control before calling handle.
382         // in a slight race with #completed, but not sure what to do with that anyway.
383         releaseRequestBuffer();
384         if (_chunk!=null)
385             _bufferPool.release(_chunk);
386         _chunk=null;
387         _generator.reset();
388 
389         // if we are not called from the onfillable thread, schedule completion
390         if (getCurrentConnection()!=this)
391         {
392             // If we are looking for the next request
393             if (_parser.isStart())
394             {
395                 // if the buffer is empty
396                 if (BufferUtil.isEmpty(_requestBuffer))
397                 {
398                     // look for more data
399                     fillInterested();
400                 }
401                 // else if we are still running
402                 else if (getConnector().isRunning())
403                 {
404                     // Dispatched to handle a pipelined request
405                     try
406                     {
407                         getExecutor().execute(this);
408                     }
409                     catch (RejectedExecutionException e)
410                     {
411                         if (getConnector().isRunning())
412                             LOG.warn(e);
413                         else
414                             LOG.ignore(e);
415                         getEndPoint().close();
416                     }
417                 }
418                 else
419                 {
420                     getEndPoint().close();
421                 }
422             }
423             // else the parser must be closed, so seek the EOF if we are still open 
424             else if (getEndPoint().isOpen())
425                 fillInterested();
426         }
427     }
428 
429     @Override
430     protected void onFillInterestedFailed(Throwable cause)
431     {
432         _parser.close();
433         super.onFillInterestedFailed(cause);
434     }
435 
436     @Override
437     public void onOpen()
438     {
439         super.onOpen();
440         fillInterested();
441     }
442 
443     @Override
444     public void onClose()
445     {
446         _sendCallback.close();
447         super.onClose();
448     }
449 
450     @Override
451     public void run()
452     {
453         onFillable();
454     }
455 
456     @Override
457     public void send(ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback)
458     {
459         // If we are still expecting a 100 continues when we commit
460         if (info!=null && _channel.isExpecting100Continue())
461             // then we can't be persistent
462             _generator.setPersistent(false);
463         
464         if(_sendCallback.reset(info,content,lastContent,callback))
465             _sendCallback.iterate();
466     }
467 
468     @Override
469     public void send(ByteBuffer content, boolean lastContent, Callback callback)
470     {
471         if (!lastContent && BufferUtil.isEmpty(content))
472             callback.succeeded();
473         else if (_sendCallback.reset(null,content,lastContent,callback))
474             _sendCallback.iterate();
475     }
476     
477     protected class HttpChannelOverHttp extends HttpChannel<ByteBuffer>
478     {        
479         public HttpChannelOverHttp(Connector connector, HttpConfiguration config, EndPoint endPoint, HttpTransport transport, HttpInput<ByteBuffer> input)
480         {
481             super(connector,config,endPoint,transport,input);
482         }
483         
484         @Override
485         public void earlyEOF()
486         {
487             // If we have no request yet, just close
488             if (getRequest().getMethod()==null)
489                 close();
490             else
491                 super.earlyEOF();
492         }
493 
494         @Override
495         public boolean content(ByteBuffer item)
496         {
497             super.content(item);
498             return true;
499         }
500 
501         @Override
502         public void badMessage(int status, String reason)
503         {
504             _generator.setPersistent(false);
505             super.badMessage(status,reason);
506         }
507 
508         @Override
509         public boolean headerComplete()
510         {
511             boolean persistent;
512             HttpVersion version = getHttpVersion();
513 
514             switch (version)
515             {
516                 case HTTP_0_9:
517                 {
518                     persistent = false;
519                     break;
520                 }
521                 case HTTP_1_0:
522                 {
523                     persistent = getRequest().getHttpFields().contains(HttpHeader.CONNECTION, HttpHeaderValue.KEEP_ALIVE.asString());
524                     if (!persistent)
525                         persistent = HttpMethod.CONNECT.is(getRequest().getMethod());
526                     if (persistent)
527                         getResponse().getHttpFields().add(HttpHeader.CONNECTION, HttpHeaderValue.KEEP_ALIVE);
528                     break;
529                 }
530                 case HTTP_1_1:
531                 {
532                     persistent = !getRequest().getHttpFields().contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString());
533                     if (!persistent)
534                         persistent = HttpMethod.CONNECT.is(getRequest().getMethod());
535                     if (!persistent)
536                         getResponse().getHttpFields().add(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE);
537                     break;
538                 }
539                 default:
540                 {
541                     throw new IllegalStateException();
542                 }
543             }
544 
545             if (!persistent)
546                 _generator.setPersistent(false);
547 
548             if (!super.headerComplete())
549                 return false;
550             
551             // Should we delay dispatch until we have some content?
552             // We should not delay if there is no content expect or client is expecting 100 or the response is already committed or the request buffer already has something in it to parse
553             if (getHttpConfiguration().isDelayDispatchUntilContent() && _parser.getContentLength() > 0 &&
554                     !isExpecting100Continue() && !isCommitted() && BufferUtil.isEmpty(_requestBuffer))
555                 return false;
556 
557             return true;
558         }
559 
560         @Override
561         protected void handleException(Throwable x)
562         {
563             _generator.setPersistent(false);
564             super.handleException(x);
565         }
566 
567         @Override
568         public void abort()
569         {
570             super.abort();
571             _generator.setPersistent(false);
572         }
573 
574         @Override
575         public boolean messageComplete()
576         {
577             super.messageComplete();
578             return false;
579         }
580     }
581 
582     private class SendCallback extends IteratingCallback
583     {
584         private ResponseInfo _info;
585         private ByteBuffer _content;
586         private boolean _lastContent;
587         private Callback _callback;
588         private ByteBuffer _header;
589         private boolean _shutdownOut;
590 
591         private SendCallback()
592         {
593             super(true);
594         }
595 
596         private boolean reset(ResponseInfo info, ByteBuffer content, boolean last, Callback callback)
597         {
598             if (reset())
599             {
600                 _info = info;
601                 _content = content;
602                 _lastContent = last;
603                 _callback = callback;
604                 _header = null;
605                 _shutdownOut = false;
606                 return true;
607             }
608             
609             if (isClosed())
610                 callback.failed(new EofException());
611             else
612                 callback.failed(new WritePendingException());
613             return false;
614         }
615 
616         @Override
617         public Action process() throws Exception
618         {
619             if (_callback==null)
620                 throw new IllegalStateException();
621             
622             ByteBuffer chunk = _chunk;
623             while (true)
624             {
625                 HttpGenerator.Result result = _generator.generateResponse(_info, _header, chunk, _content, _lastContent);
626                 if (LOG.isDebugEnabled())
627                     LOG.debug("{} generate: {} ({},{},{})@{}",
628                         this,
629                         result,
630                         BufferUtil.toSummaryString(_header),
631                         BufferUtil.toSummaryString(_content),
632                         _lastContent,
633                         _generator.getState());
634 
635                 switch (result)
636                 {
637                     case NEED_HEADER:
638                     {
639                         _header = _bufferPool.acquire(_config.getResponseHeaderSize(), HEADER_BUFFER_DIRECT);    
640                         continue;
641                     }
642                     case NEED_CHUNK:
643                     {
644                         chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT);
645                         continue;
646                     }
647                     case FLUSH:
648                     {
649                         // Don't write the chunk or the content if this is a HEAD response, or any other type of response that should have no content
650                         if (_channel.getRequest().isHead() || _generator.isNoContent())
651                         {
652                             BufferUtil.clear(chunk);
653                             BufferUtil.clear(_content);
654                         }
655 
656                         // If we have a header
657                         if (BufferUtil.hasContent(_header))
658                         {
659                             if (BufferUtil.hasContent(_content))
660                             {
661                                 if (BufferUtil.hasContent(chunk))
662                                     getEndPoint().write(this, _header, chunk, _content);
663                                 else
664                                     getEndPoint().write(this, _header, _content);
665                             }
666                             else
667                                 getEndPoint().write(this, _header);
668                         }
669                         else if (BufferUtil.hasContent(chunk))
670                         {
671                             if (BufferUtil.hasContent(_content))
672                                 getEndPoint().write(this, chunk, _content);
673                             else
674                                 getEndPoint().write(this, chunk);
675                         }
676                         else if (BufferUtil.hasContent(_content))
677                         {
678                             getEndPoint().write(this, _content);
679                         }
680                         else
681                         {
682                             succeeded(); // nothing to write
683                         }
684                         return Action.SCHEDULED;
685                     }
686                     case SHUTDOWN_OUT:
687                     {
688                         _shutdownOut=true;
689                         continue;
690                     }
691                     case DONE:
692                     {
693                         return Action.SUCCEEDED;
694                     }
695                     case CONTINUE:
696                     {
697                         break;
698                     }
699                     default:
700                     {
701                         throw new IllegalStateException("generateResponse="+result);
702                     }
703                 }
704             }
705         }
706 
707         private void releaseHeader()
708         {
709             ByteBuffer h=_header;
710             _header=null;
711             if (h!=null)
712                 _bufferPool.release(h);
713         }
714         
715         @Override
716         protected void onCompleteSuccess()
717         {
718             releaseHeader();
719             _callback.succeeded();
720             if (_shutdownOut)
721                 getEndPoint().shutdownOutput();
722         }
723 
724         @Override
725         public void onCompleteFailure(final Throwable x)
726         {
727             releaseHeader();
728             failedCallback(_callback,x);
729             if (_shutdownOut)
730                 getEndPoint().shutdownOutput();
731         }
732         
733         @Override
734         public String toString()
735         {
736             return String.format("%s[i=%s,cb=%s]",super.toString(),_info,_callback);
737         }
738     }
739 
740 
741     @Override
742     public void abort()
743     {
744         // Do a direct close of the output, as this may indicate to a client that the 
745         // response is bad either with RST or by abnormal completion of chunked response.
746         getEndPoint().close();
747     }
748 
749 }