View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.server;
20  
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  import java.nio.channels.WritePendingException;
24  import java.util.concurrent.RejectedExecutionException;
25  
26  import org.eclipse.jetty.http.HttpGenerator;
27  import org.eclipse.jetty.http.HttpGenerator.ResponseInfo;
28  import org.eclipse.jetty.http.HttpHeader;
29  import org.eclipse.jetty.http.HttpHeaderValue;
30  import org.eclipse.jetty.http.HttpMethod;
31  import org.eclipse.jetty.http.HttpParser;
32  import org.eclipse.jetty.http.HttpStatus;
33  import org.eclipse.jetty.http.HttpVersion;
34  import org.eclipse.jetty.io.AbstractConnection;
35  import org.eclipse.jetty.io.ByteBufferPool;
36  import org.eclipse.jetty.io.Connection;
37  import org.eclipse.jetty.io.EndPoint;
38  import org.eclipse.jetty.io.EofException;
39  import org.eclipse.jetty.util.BufferUtil;
40  import org.eclipse.jetty.util.Callback;
41  import org.eclipse.jetty.util.IteratingCallback;
42  import org.eclipse.jetty.util.log.Log;
43  import org.eclipse.jetty.util.log.Logger;
44  
45  /**
46   * <p>A {@link Connection} that handles the HTTP protocol.</p>
47   */
48  public class HttpConnection extends AbstractConnection implements Runnable, HttpTransport
49  {
50      public static final String UPGRADE_CONNECTION_ATTRIBUTE = "org.eclipse.jetty.server.HttpConnection.UPGRADE";
51      private static final boolean REQUEST_BUFFER_DIRECT=false;
52      private static final boolean HEADER_BUFFER_DIRECT=false;
53      private static final boolean CHUNK_BUFFER_DIRECT=false;
54      private static final Logger LOG = Log.getLogger(HttpConnection.class);
55      private static final ThreadLocal<HttpConnection> __currentConnection = new ThreadLocal<>();
56  
57      private final HttpConfiguration _config;
58      private final Connector _connector;
59      private final ByteBufferPool _bufferPool;
60      private final HttpGenerator _generator;
61      private final HttpChannelOverHttp _channel;
62      private final HttpParser _parser;
63      private volatile ByteBuffer _requestBuffer = null;
64      private volatile ByteBuffer _chunk = null;
65      private final SendCallback _sendCallback = new SendCallback();
66  
67  
68      /* ------------------------------------------------------------ */
69      /** Get the current connection that this thread is dispatched to.
70       * Note that a thread may be processing a request asynchronously and 
71       * thus not be dispatched to the connection.  
72       * @see Request#getAttribute(String) for a more general way to access the HttpConnection
73       * @return the current HttpConnection or null
74       */
75      public static HttpConnection getCurrentConnection()
76      {
77          return __currentConnection.get();
78      }
79  
80      protected static HttpConnection setCurrentConnection(HttpConnection connection)
81      {
82          HttpConnection last=__currentConnection.get();
83          if (connection==null)
84              __currentConnection.remove();
85          else 
86              __currentConnection.set(connection);
87          return last;
88      }
89  
90      public HttpConfiguration getHttpConfiguration()
91      {
92          return _config;
93      }
94  
95      public HttpConnection(HttpConfiguration config, Connector connector, EndPoint endPoint)
96      {
97          // Tell AbstractConnector executeOnFillable==true because we want the same thread that
98          // does the HTTP parsing to handle the request so its cache is hot
99          super(endPoint, connector.getExecutor(),true);
100 
101         _config = config;
102         _connector = connector;
103         _bufferPool = _connector.getByteBufferPool();
104         _generator = newHttpGenerator();
105         HttpInput<ByteBuffer> input = newHttpInput();
106         _channel = newHttpChannel(input);
107         _parser = newHttpParser();
108         if (LOG.isDebugEnabled())
109             LOG.debug("New HTTP Connection {}", this);
110     }
111 
112     protected HttpGenerator newHttpGenerator()
113     {
114         return new HttpGenerator(_config.getSendServerVersion(),_config.getSendXPoweredBy());
115     }
116     
117     protected HttpInput<ByteBuffer> newHttpInput()
118     {
119         return new HttpInputOverHTTP(this);
120     }
121     
122     protected HttpChannelOverHttp newHttpChannel(HttpInput<ByteBuffer> httpInput)
123     {
124         return new HttpChannelOverHttp(_connector, _config, getEndPoint(), this, httpInput);
125     }
126     
127     protected HttpParser newHttpParser()
128     {
129         return new HttpParser(newRequestHandler(), getHttpConfiguration().getRequestHeaderSize());
130     }
131 
132     protected HttpParser.RequestHandler<ByteBuffer> newRequestHandler()
133     {
134         return _channel;
135     }
136 
137     public Server getServer()
138     {
139         return _connector.getServer();
140     }
141 
142     public Connector getConnector()
143     {
144         return _connector;
145     }
146 
147     public HttpChannel<?> getHttpChannel()
148     {
149         return _channel;
150     }
151 
152     public HttpParser getParser()
153     {
154         return _parser;
155     }
156 
157     @Override
158     public int getMessagesIn()
159     {
160         return getHttpChannel().getRequests();
161     }
162 
163     @Override
164     public int getMessagesOut()
165     {
166         return getHttpChannel().getRequests();
167     }
168 
169     void releaseRequestBuffer()
170     {
171         if (_requestBuffer != null && !_requestBuffer.hasRemaining())
172         {
173             ByteBuffer buffer=_requestBuffer;
174             _requestBuffer=null;
175             _bufferPool.release(buffer);
176         }
177     }
178     
179     public ByteBuffer getRequestBuffer()
180     {
181         if (_requestBuffer == null)
182             _requestBuffer = _bufferPool.acquire(getInputBufferSize(), REQUEST_BUFFER_DIRECT);
183         return _requestBuffer;
184     }
185 
186     /**
187      * <p>Parses and handles HTTP messages.</p>
188      * <p>This method is called when this {@link Connection} is ready to read bytes from the {@link EndPoint}.
189      * However, it can also be called if there is unconsumed data in the _requestBuffer, as a result of
190      * resuming a suspended request when there is a pipelined request already read into the buffer.</p>
191      * <p>This method fills bytes and parses them until either: EOF is filled; 0 bytes are filled;
192      * the HttpChannel finishes handling; or the connection has changed.</p>
193      */
194     @Override
195     public void onFillable()
196     {
197         if (LOG.isDebugEnabled())
198             LOG.debug("{} onFillable {}", this, _channel.getState());
199 
200         final HttpConnection last=setCurrentConnection(this);
201         int filled=Integer.MAX_VALUE;
202         boolean suspended=false;
203         try
204         {
205             // while not suspended and not upgraded
206             while (!suspended && getEndPoint().getConnection()==this)
207             {
208                 // Do we need some data to parse
209                 if (BufferUtil.isEmpty(_requestBuffer))
210                 {
211                     // If the previous iteration filled 0 bytes or saw a close, then break here 
212                     if (filled<=0)
213                         break;
214                         
215                     // Can we fill?
216                     if(getEndPoint().isInputShutdown())
217                     {
218                         // No pretend we read -1
219                         filled=-1;
220                         _parser.atEOF();
221                     }
222                     else
223                     {
224                         // Get a buffer
225                         // We are not in a race here for the request buffer as we have not yet received a request,
226                         // so there are not an possible legal threads calling #parseContent or #completed.
227                         _requestBuffer = getRequestBuffer();
228 
229                         // fill
230                         filled = getEndPoint().fill(_requestBuffer);
231                         if (filled==0) // Do a retry on fill 0 (optimization for SSL connections)
232                             filled = getEndPoint().fill(_requestBuffer);
233                         
234                         // tell parser
235                         if (filled < 0)
236                             _parser.atEOF();
237                     }
238                 }
239                 
240                 // Parse the buffer
241                 if (_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer))
242                 {
243                     // The parser returned true, which indicates the channel is ready to handle a request.
244                     // Call the channel and this will either handle the request/response to completion OR,
245                     // if the request suspends, the request/response will be incomplete so the outer loop will exit.
246                     // Not that onFillable no longer manipulates the request buffer from this point and that is
247                     // left to threads calling #completed or #parseContent (which may be this thread inside handle())
248                     suspended = !_channel.handle();
249                 }
250                 else
251                 {
252                     // We parsed what we could, recycle the request buffer
253                     // We are not in a race here for the request buffer as we have not yet received a request,
254                     // so there are not an possible legal threads calling #parseContent or #completed.
255                     releaseRequestBuffer();
256                 }
257             }
258         }
259         catch (EofException e)
260         {
261             LOG.debug(e);
262         }
263         catch (Exception e)
264         {
265             if (_parser.isIdle())
266                 LOG.debug(e);
267             else
268                 LOG.warn(this.toString(), e);
269             close();
270         }
271         finally
272         {                        
273             setCurrentConnection(last);
274             if (!suspended && getEndPoint().isOpen() && getEndPoint().getConnection()==this)
275             {
276                 fillInterested();
277             }
278         }
279     }
280 
281     /* ------------------------------------------------------------ */
282     /** Fill and parse data looking for content
283      * @throws IOException
284      */
285     protected void parseContent() throws IOException
286     {
287         // Not in a race here for the request buffer with #onFillable because an async consumer of
288         // content would only be started after onFillable has given up control.
289         // In a little bit of a race with #completed, but then not sure if it is legal to be doing 
290         // async calls to IO and have a completed call at the same time.
291         ByteBuffer requestBuffer = getRequestBuffer();
292 
293         while (_parser.inContentState())
294         {
295             // Can the parser progress (even with an empty buffer)
296             boolean parsed = _parser.parseNext(requestBuffer==null?BufferUtil.EMPTY_BUFFER:requestBuffer);
297 
298             // No, we can we try reading some content?
299             if (BufferUtil.isEmpty(requestBuffer) && getEndPoint().isInputShutdown())
300             {
301                 _parser.atEOF();
302                 if (parsed)
303                     break;
304                 continue;
305             }
306 
307             if (parsed)
308                 break;
309             
310             // OK lets read some data
311             int filled=getEndPoint().fill(requestBuffer);
312             if (LOG.isDebugEnabled()) // Avoid boxing of variable 'filled'
313                 LOG.debug("{} filled {}",this,filled);
314             if (filled<=0)
315             {
316                 if (filled<0)
317                 {
318                     _parser.atEOF();
319                     continue;
320                 }
321                 break;
322             }
323         }
324     }
325     
326     @Override
327     public void completed()
328     {
329         // Handle connection upgrades
330         if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
331         {
332             Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE);
333             if (connection != null)
334             {
335                 if (LOG.isDebugEnabled())
336                     LOG.debug("Upgrade from {} to {}", this, connection);
337                 onClose();
338                 getEndPoint().setConnection(connection);
339                 connection.onOpen();
340                 _channel.reset();
341                 _parser.reset();
342                 _generator.reset();
343                 releaseRequestBuffer();
344                 return;
345             }
346         }
347         
348         // Finish consuming the request
349         // If we are still expecting
350         if (_channel.isExpecting100Continue())
351             // close to seek EOF
352             _parser.close();
353         else if (_parser.inContentState() && _generator.isPersistent())
354             // Complete reading the request
355             _channel.getRequest().getHttpInput().consumeAll();
356 
357         // Reset the channel, parsers and generator
358         _channel.reset();
359         if (_generator.isPersistent() && !_parser.isClosed())
360             _parser.reset();
361         else
362             _parser.close();
363         
364         // Not in a race here with onFillable, because it has given up control before calling handle.
365         // in a slight race with #completed, but not sure what to do with that anyway.
366         releaseRequestBuffer();
367         if (_chunk!=null)
368             _bufferPool.release(_chunk);
369         _chunk=null;
370         _generator.reset();
371 
372         // if we are not called from the onfillable thread, schedule completion
373         if (getCurrentConnection()!=this)
374         {
375             // If we are looking for the next request
376             if (_parser.isStart())
377             {
378                 // if the buffer is empty
379                 if (BufferUtil.isEmpty(_requestBuffer))
380                 {
381                     // look for more data
382                     fillInterested();
383                 }
384                 // else if we are still running
385                 else if (getConnector().isRunning())
386                 {
387                     // Dispatched to handle a pipelined request
388                     try
389                     {
390                         getExecutor().execute(this);
391                     }
392                     catch (RejectedExecutionException e)
393                     {
394                         if (getConnector().isRunning())
395                             LOG.warn(e);
396                         else
397                             LOG.ignore(e);
398                         getEndPoint().close();
399                     }
400                 }
401                 else
402                 {
403                     getEndPoint().close();
404                 }
405             }
406             // else the parser must be closed, so seek the EOF if we are still open 
407             else if (getEndPoint().isOpen())
408                 fillInterested();
409         }
410     }
411 
412     @Override
413     protected void onFillInterestedFailed(Throwable cause)
414     {
415         _parser.close();
416         super.onFillInterestedFailed(cause);
417     }
418 
419     @Override
420     public void onOpen()
421     {
422         super.onOpen();
423         fillInterested();
424     }
425 
426     @Override
427     public void onClose()
428     {
429         _sendCallback.close();
430         super.onClose();
431     }
432 
433     @Override
434     public void run()
435     {
436         onFillable();
437     }
438 
439     @Override
440     public void send(ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback)
441     {
442         // If we are still expecting a 100 continues when we commit
443         if (info!=null && _channel.isExpecting100Continue())
444             // then we can't be persistent
445             _generator.setPersistent(false);
446         
447         if(_sendCallback.reset(info,content,lastContent,callback))
448             _sendCallback.iterate();
449     }
450 
451     @Override
452     public void send(ByteBuffer content, boolean lastContent, Callback callback)
453     {
454         if (!lastContent && BufferUtil.isEmpty(content))
455             callback.succeeded();
456         else if (_sendCallback.reset(null,content,lastContent,callback))
457             _sendCallback.iterate();
458     }
459     
460     protected class HttpChannelOverHttp extends HttpChannel<ByteBuffer>
461     {
462         public HttpChannelOverHttp(Connector connector, HttpConfiguration config, EndPoint endPoint, HttpTransport transport, HttpInput<ByteBuffer> input)
463         {
464             super(connector,config,endPoint,transport,input);
465         }
466         
467         @Override
468         public void earlyEOF()
469         {
470             // If we have no request yet, just close
471             if (getRequest().getMethod()==null)
472                 close();
473             else
474                 super.earlyEOF();
475         }
476 
477         @Override
478         public boolean content(ByteBuffer item)
479         {
480             super.content(item);
481             return true;
482         }
483 
484         @Override
485         public void badMessage(int status, String reason)
486         {
487             _generator.setPersistent(false);
488             super.badMessage(status,reason);
489         }
490 
491         @Override
492         public boolean headerComplete()
493         {
494             boolean persistent;
495             HttpVersion version = getHttpVersion();
496 
497             switch (version)
498             {
499                 case HTTP_0_9:
500                 {
501                     persistent = false;
502                     break;
503                 }
504                 case HTTP_1_0:
505                 {
506                     persistent = getRequest().getHttpFields().contains(HttpHeader.CONNECTION, HttpHeaderValue.KEEP_ALIVE.asString());
507                     if (!persistent)
508                         persistent = HttpMethod.CONNECT.is(getRequest().getMethod());
509                     if (persistent)
510                         getResponse().getHttpFields().add(HttpHeader.CONNECTION, HttpHeaderValue.KEEP_ALIVE);
511                     break;
512                 }
513                 case HTTP_1_1:
514                 {
515                     persistent = !getRequest().getHttpFields().contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString());
516                     if (!persistent)
517                         persistent = HttpMethod.CONNECT.is(getRequest().getMethod());
518                     if (!persistent)
519                         getResponse().getHttpFields().add(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE);
520                     break;
521                 }
522                 default:
523                 {
524                     throw new IllegalStateException();
525                 }
526             }
527 
528             if (!persistent)
529                 _generator.setPersistent(false);
530 
531             return super.headerComplete();
532         }
533 
534         @Override
535         protected void handleException(Throwable x)
536         {
537             _generator.setPersistent(false);
538             super.handleException(x);
539         }
540 
541         @Override
542         public void abort()
543         {
544             super.abort();
545             _generator.setPersistent(false);
546         }
547 
548         @Override
549         public boolean messageComplete()
550         {
551             super.messageComplete();
552             return false;
553         }
554     }
555 
556     private class SendCallback extends IteratingCallback
557     {
558         private ResponseInfo _info;
559         private ByteBuffer _content;
560         private boolean _lastContent;
561         private Callback _callback;
562         private ByteBuffer _header;
563         private boolean _shutdownOut;
564 
565         private SendCallback()
566         {
567             super(true);
568         }
569 
570         private boolean reset(ResponseInfo info, ByteBuffer content, boolean last, Callback callback)
571         {
572             if (reset())
573             {
574                 _info = info;
575                 _content = content;
576                 _lastContent = last;
577                 _callback = callback;
578                 _header = null;
579                 _shutdownOut = false;
580                 return true;
581             }
582             
583             if (isClosed())
584                 callback.failed(new EofException());
585             else
586                 callback.failed(new WritePendingException());
587             return false;
588         }
589 
590         @Override
591         public Action process() throws Exception
592         {
593             if (_callback==null)
594                 throw new IllegalStateException();
595             
596             ByteBuffer chunk = _chunk;
597             while (true)
598             {
599                 HttpGenerator.Result result = _generator.generateResponse(_info, _header, chunk, _content, _lastContent);
600                 if (LOG.isDebugEnabled())
601                     LOG.debug("{} generate: {} ({},{},{})@{}",
602                         this,
603                         result,
604                         BufferUtil.toSummaryString(_header),
605                         BufferUtil.toSummaryString(_content),
606                         _lastContent,
607                         _generator.getState());
608 
609                 switch (result)
610                 {
611                     case NEED_HEADER:
612                     {
613                         _header = _bufferPool.acquire(_config.getResponseHeaderSize(), HEADER_BUFFER_DIRECT);    
614                         continue;
615                     }
616                     case NEED_CHUNK:
617                     {
618                         chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT);
619                         continue;
620                     }
621                     case FLUSH:
622                     {
623                         // Don't write the chunk or the content if this is a HEAD response, or any other type of response that should have no content
624                         if (_channel.getRequest().isHead() || _generator.isNoContent())
625                         {
626                             BufferUtil.clear(chunk);
627                             BufferUtil.clear(_content);
628                         }
629 
630                         // If we have a header
631                         if (BufferUtil.hasContent(_header))
632                         {
633                             if (BufferUtil.hasContent(_content))
634                             {
635                                 if (BufferUtil.hasContent(chunk))
636                                     getEndPoint().write(this, _header, chunk, _content);
637                                 else
638                                     getEndPoint().write(this, _header, _content);
639                             }
640                             else
641                                 getEndPoint().write(this, _header);
642                         }
643                         else if (BufferUtil.hasContent(chunk))
644                         {
645                             if (BufferUtil.hasContent(_content))
646                                 getEndPoint().write(this, chunk, _content);
647                             else
648                                 getEndPoint().write(this, chunk);
649                         }
650                         else if (BufferUtil.hasContent(_content))
651                         {
652                             getEndPoint().write(this, _content);
653                         }
654                         else
655                         {
656                             succeeded(); // nothing to write
657                         }
658                         return Action.SCHEDULED;
659                     }
660                     case SHUTDOWN_OUT:
661                     {
662                         _shutdownOut=true;
663                         continue;
664                     }
665                     case DONE:
666                     {
667                         return Action.SUCCEEDED;
668                     }
669                     case CONTINUE:
670                     {
671                         break;
672                     }
673                     default:
674                     {
675                         throw new IllegalStateException("generateResponse="+result);
676                     }
677                 }
678             }
679         }
680 
681         private void releaseHeader()
682         {
683             ByteBuffer h=_header;
684             _header=null;
685             if (h!=null)
686                 _bufferPool.release(h);
687         }
688         
689         @Override
690         protected void onCompleteSuccess()
691         {
692             releaseHeader();
693             _callback.succeeded();
694             if (_shutdownOut)
695                 getEndPoint().shutdownOutput();
696         }
697 
698         @Override
699         public void onCompleteFailure(final Throwable x)
700         {
701             releaseHeader();
702             failedCallback(_callback,x);
703             if (_shutdownOut)
704                 getEndPoint().shutdownOutput();
705         }
706         
707         @Override
708         public String toString()
709         {
710             return String.format("%s[i=%s,cb=%s]",super.toString(),_info,_callback);
711         }
712     }
713 
714 
715     @Override
716     public void abort()
717     {
718         // Do a direct close of the output, as this may indicate to a client that the 
719         // response is bad either with RST or by abnormal completion of chunked response.
720         getEndPoint().close();
721     }
722 
723 }