View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.server;
20  
21  import java.nio.ByteBuffer;
22  import java.nio.channels.ByteChannel;
23  import java.util.concurrent.RejectedExecutionException;
24  
25  import org.eclipse.jetty.http.HttpGenerator;
26  import org.eclipse.jetty.http.HttpGenerator.ResponseInfo;
27  import org.eclipse.jetty.http.HttpHeader;
28  import org.eclipse.jetty.http.HttpHeaderValue;
29  import org.eclipse.jetty.http.HttpMethod;
30  import org.eclipse.jetty.http.HttpParser;
31  import org.eclipse.jetty.http.HttpStatus;
32  import org.eclipse.jetty.http.HttpVersion;
33  import org.eclipse.jetty.io.AbstractConnection;
34  import org.eclipse.jetty.io.ByteBufferPool;
35  import org.eclipse.jetty.io.Connection;
36  import org.eclipse.jetty.io.EndPoint;
37  import org.eclipse.jetty.io.EofException;
38  import org.eclipse.jetty.util.BufferUtil;
39  import org.eclipse.jetty.util.Callback;
40  import org.eclipse.jetty.util.IteratingNestedCallback;
41  import org.eclipse.jetty.util.log.Log;
42  import org.eclipse.jetty.util.log.Logger;
43  
44  /**
45   * <p>A {@link Connection} that handles the HTTP protocol.</p>
46   */
47  public class HttpConnection extends AbstractConnection implements Runnable, HttpTransport
48  {
49      public static final String UPGRADE_CONNECTION_ATTRIBUTE = "org.eclipse.jetty.server.HttpConnection.UPGRADE";
50      private static final boolean REQUEST_BUFFER_DIRECT=false;
51      private static final boolean HEADER_BUFFER_DIRECT=false;
52      private static final boolean CHUNK_BUFFER_DIRECT=false;
53      private static final Logger LOG = Log.getLogger(HttpConnection.class);
54      private static final ThreadLocal<HttpConnection> __currentConnection = new ThreadLocal<>();
55  
56      private final HttpConfiguration _config;
57      private final Connector _connector;
58      private final ByteBufferPool _bufferPool;
59      private final HttpGenerator _generator;
60      private final HttpChannelOverHttp _channel;
61      private final HttpParser _parser;
62      private volatile ByteBuffer _requestBuffer = null;
63      private volatile ByteBuffer _chunk = null;
64  
65  
66      public static HttpConnection getCurrentConnection()
67      {
68          return __currentConnection.get();
69      }
70  
71      protected static void setCurrentConnection(HttpConnection connection)
72      {
73          __currentConnection.set(connection);
74      }
75  
76      public HttpConfiguration getHttpConfiguration()
77      {
78          return _config;
79      }
80  
81      public HttpConnection(HttpConfiguration config, Connector connector, EndPoint endPoint)
82      {
83          // Tell AbstractConnector executeOnFillable==true because we want the same thread that
84          // does the HTTP parsing to handle the request so its cache is hot
85          super(endPoint, connector.getExecutor(),true);
86  
87          _config = config;
88          _connector = connector;
89          _bufferPool = _connector.getByteBufferPool();
90          _generator = newHttpGenerator();
91          HttpInput<ByteBuffer> input = newHttpInput();
92          _channel = newHttpChannel(input);
93          _parser = newHttpParser();
94          LOG.debug("New HTTP Connection {}", this);
95      }
96  
97      protected HttpGenerator newHttpGenerator()
98      {
99          return new HttpGenerator(_config.getSendServerVersion(),_config.getSendXPoweredBy());
100     }
101     
102     protected HttpInput<ByteBuffer> newHttpInput()
103     {
104         return new HttpInputOverHTTP(this);
105     }
106     
107     protected HttpChannelOverHttp newHttpChannel(HttpInput<ByteBuffer> httpInput)
108     {
109         return new HttpChannelOverHttp(_connector, _config, getEndPoint(), this, httpInput);
110     }
111     
112     protected HttpParser newHttpParser()
113     {
114         return new HttpParser(newRequestHandler(), getHttpConfiguration().getRequestHeaderSize());
115     }
116 
117     protected HttpParser.RequestHandler<ByteBuffer> newRequestHandler()
118     {
119         return _channel;
120     }
121 
122     public Server getServer()
123     {
124         return _connector.getServer();
125     }
126 
127     public Connector getConnector()
128     {
129         return _connector;
130     }
131 
132     public HttpChannel<?> getHttpChannel()
133     {
134         return _channel;
135     }
136 
137     public HttpParser getParser()
138     {
139         return _parser;
140     }
141 
142     @Override
143     public int getMessagesIn()
144     {
145         return getHttpChannel().getRequests();
146     }
147 
148     @Override
149     public int getMessagesOut()
150     {
151         return getHttpChannel().getRequests();
152     }
153 
154     void releaseRequestBuffer()
155     {
156         if (_requestBuffer != null && !_requestBuffer.hasRemaining())
157         {
158             ByteBuffer buffer=_requestBuffer;
159             _requestBuffer=null;
160             _bufferPool.release(buffer);
161         }
162     }
163     
164     public ByteBuffer getRequestBuffer()
165     {
166         if (_requestBuffer == null)
167             _requestBuffer = _bufferPool.acquire(getInputBufferSize(), REQUEST_BUFFER_DIRECT);
168         return _requestBuffer;
169     }
170 
171     /**
172      * <p>Parses and handles HTTP messages.</p>
173      * <p>This method is called when this {@link Connection} is ready to read bytes from the {@link EndPoint}.
174      * However, it can also be called if there is unconsumed data in the _requestBuffer, as a result of
175      * resuming a suspended request when there is a pipelined request already read into the buffer.</p>
176      * <p>This method fills bytes and parses them until either: EOF is filled; 0 bytes are filled;
177      * the HttpChannel finishes handling; or the connection has changed.</p>
178      */
179     @Override
180     public void onFillable()
181     {
182         LOG.debug("{} onFillable {}", this, _channel.getState());
183 
184         setCurrentConnection(this);
185         int filled=Integer.MAX_VALUE;
186         boolean suspended=false;
187         try
188         {
189             // while not suspended and not upgraded
190             while (!suspended && getEndPoint().getConnection()==this)
191             {
192                 // Do we need some data to parse
193                 if (BufferUtil.isEmpty(_requestBuffer))
194                 {
195                     // If the previous iteration filled 0 bytes or saw a close, then break here 
196                     if (filled<=0)
197                         break;
198                         
199                     // Can we fill?
200                     if(getEndPoint().isInputShutdown())
201                     {
202                         // No pretend we read -1
203                         filled=-1;
204                         _parser.atEOF();
205                     }
206                     else
207                     {
208                         // Get a buffer
209                         if (_requestBuffer == null)
210                             _requestBuffer = _bufferPool.acquire(getInputBufferSize(), REQUEST_BUFFER_DIRECT);
211 
212                         // fill
213                         filled = getEndPoint().fill(_requestBuffer);
214                         if (filled==0) // Do a retry on fill 0 (optimization for SSL connections)
215                             filled = getEndPoint().fill(_requestBuffer);
216                         
217                         // tell parser
218                         if (filled < 0)
219                             _parser.atEOF();
220                     }
221                 }
222                 
223                 // Parse the buffer
224                 if (_parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer))
225                 {
226                     // The parser returned true, which indicates the channel is ready to handle a request.
227                     // Call the channel and this will either handle the request/response to completion OR,
228                     // if the request suspends, the request/response will be incomplete so the outer loop will exit.
229                     suspended = !_channel.handle();
230                 }
231                 
232             }
233         }
234         catch (EofException e)
235         {
236             LOG.debug(e);
237         }
238         catch (Exception e)
239         {
240             if (_parser.isIdle())
241                 LOG.debug(e);
242             else
243                 LOG.warn(this.toString(), e);
244             close();
245         }
246         finally
247         {                        
248             setCurrentConnection(null);
249             if (!suspended && getEndPoint().isOpen() && getEndPoint().getConnection()==this)
250             {
251                 fillInterested();
252             }
253         }
254     }
255     
256 
257     @Override
258     protected void onFillInterestedFailed(Throwable cause)
259     {
260         _parser.close();
261         super.onFillInterestedFailed(cause);
262     }
263 
264     @Override
265     public void onOpen()
266     {
267         super.onOpen();
268         fillInterested();
269     }
270 
271     @Override
272     public void run()
273     {
274         onFillable();
275     }
276 
277 
278     @Override
279     public void send(ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback)
280     {
281         if (info==null)
282             new ContentCallback(content,lastContent,callback).iterate();
283         else
284         {
285             // If we are still expecting a 100 continues
286             if (_channel.isExpecting100Continue())
287                 // then we can't be persistent
288                 _generator.setPersistent(false);
289             new CommitCallback(info,content,lastContent,callback).iterate();
290         }
291     }
292 
293     @Override
294     public void send(ByteBuffer content, boolean lastContent, Callback callback)
295     {
296         new ContentCallback(content,lastContent,callback).iterate();
297     }
298 
299     @Override
300     public void completed()
301     {
302         // Handle connection upgrades
303         if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
304         {
305             Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE);
306             if (connection != null)
307             {
308                 LOG.debug("Upgrade from {} to {}", this, connection);
309                 onClose();
310                 getEndPoint().setConnection(connection);
311                 connection.onOpen();
312                 _channel.reset();
313                 _parser.reset();
314                 _generator.reset();
315                 releaseRequestBuffer();
316                 return;
317             }
318         }
319         
320         // Finish consuming the request
321         // If we are still expecting
322         if (_channel.isExpecting100Continue())
323             // close to seek EOF
324             _parser.close();
325         else if (_parser.inContentState() && _generator.isPersistent())
326             // Complete reading the request
327             _channel.getRequest().getHttpInput().consumeAll();
328 
329         // Reset the channel, parsers and generator
330         _channel.reset();
331         if (_generator.isPersistent() && !_parser.isClosed())
332             _parser.reset();
333         else
334             _parser.close();
335         releaseRequestBuffer();
336         if (_chunk!=null)
337             _bufferPool.release(_chunk);
338         _chunk=null;
339         _generator.reset();
340 
341         // if we are not called from the onfillable thread, schedule completion
342         if (getCurrentConnection()!=this)
343         {
344             // If we are looking for the next request
345             if (_parser.isStart())
346             {
347                 // if the buffer is empty
348                 if (_requestBuffer == null)
349                 {
350                     // look for more data
351                     fillInterested();
352                 }
353                 // else if we are still running
354                 else if (getConnector().isRunning())
355                 {
356                     // Dispatched to handle a pipelined request
357                     try
358                     {
359                         getExecutor().execute(this);
360                     }
361                     catch (RejectedExecutionException e)
362                     {
363                         if (getConnector().isRunning())
364                             LOG.warn(e);
365                         else
366                             LOG.ignore(e);
367                         getEndPoint().close();
368                     }
369                 }
370                 else
371                 {
372                     getEndPoint().close();
373                 }
374             }
375             // else the parser must be closed, so seek the EOF if we are still open 
376             else if (getEndPoint().isOpen())
377                 fillInterested();
378         }
379     }
380 
381     protected class HttpChannelOverHttp extends HttpChannel<ByteBuffer>
382     {
383         public HttpChannelOverHttp(Connector connector, HttpConfiguration config, EndPoint endPoint, HttpTransport transport, HttpInput<ByteBuffer> input)
384         {
385             super(connector,config,endPoint,transport,input);
386         }
387         
388         @Override
389         public void earlyEOF()
390         {
391             // If we have no request yet, just close
392             if (getRequest().getMethod()==null)
393                 close();
394             else
395                 super.earlyEOF();
396         }
397 
398         @Override
399         public boolean content(ByteBuffer item)
400         {
401             super.content(item);
402             return true;
403         }
404 
405         @Override
406         public void badMessage(int status, String reason)
407         {
408             _generator.setPersistent(false);
409             super.badMessage(status,reason);
410         }
411 
412         @Override
413         public boolean headerComplete()
414         {
415             boolean persistent;
416             HttpVersion version = getHttpVersion();
417 
418             switch (version)
419             {
420                 case HTTP_0_9:
421                 {
422                     persistent = false;
423                     break;
424                 }
425                 case HTTP_1_0:
426                 {
427                     persistent = getRequest().getHttpFields().contains(HttpHeader.CONNECTION, HttpHeaderValue.KEEP_ALIVE.asString());
428                     if (!persistent)
429                         persistent = HttpMethod.CONNECT.is(getRequest().getMethod());
430                     if (persistent)
431                         getResponse().getHttpFields().add(HttpHeader.CONNECTION, HttpHeaderValue.KEEP_ALIVE);
432                     break;
433                 }
434                 case HTTP_1_1:
435                 {
436                     persistent = !getRequest().getHttpFields().contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString());
437                     if (!persistent)
438                         persistent = HttpMethod.CONNECT.is(getRequest().getMethod());
439                     if (!persistent)
440                         getResponse().getHttpFields().add(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE);
441                     break;
442                 }
443                 default:
444                 {
445                     throw new IllegalStateException();
446                 }
447             }
448 
449             if (!persistent)
450                 _generator.setPersistent(false);
451 
452             return super.headerComplete();
453         }
454 
455         @Override
456         protected void handleException(Throwable x)
457         {
458             _generator.setPersistent(false);
459             super.handleException(x);
460         }
461 
462         @Override
463         public void failed()
464         {
465             getEndPoint().shutdownOutput();
466         }
467     }
468 
469     private class CommitCallback extends IteratingNestedCallback
470     {
471         final ByteBuffer _content;
472         final boolean _lastContent;
473         final ResponseInfo _info;
474         ByteBuffer _header;
475 
476         CommitCallback(ResponseInfo info, ByteBuffer content, boolean last, Callback callback)
477         {
478             super(callback);
479             _info=info;
480             _content=content;
481             _lastContent=last;
482         }
483 
484         @Override
485         public boolean process() throws Exception
486         {
487             ByteBuffer chunk = _chunk;
488             while (true)
489             {
490                 HttpGenerator.Result result = _generator.generateResponse(_info, _header, chunk, _content, _lastContent);
491                 if (LOG.isDebugEnabled())
492                     LOG.debug("{} generate: {} ({},{},{})@{}",
493                         this,
494                         result,
495                         BufferUtil.toSummaryString(_header),
496                         BufferUtil.toSummaryString(_content),
497                         _lastContent,
498                         _generator.getState());
499 
500                 switch (result)
501                 {
502                     case NEED_HEADER:
503                     {
504                         // Look for optimisation to avoid allocating a _header buffer
505                         if (_lastContent && _content!=null && !_content.isReadOnly() && _content.hasArray() && BufferUtil.space(_content)>_config.getResponseHeaderSize() )
506                         {
507                             // use spare space in content buffer for header buffer
508                             int p=_content.position();
509                             int l=_content.limit();
510                             _content.position(l);
511                             _content.limit(l+_config.getResponseHeaderSize());
512                             _header=_content.slice();
513                             _header.limit(0);
514                             _content.position(p);
515                             _content.limit(l);
516                         }
517                         else
518                             _header = _bufferPool.acquire(_config.getResponseHeaderSize(), HEADER_BUFFER_DIRECT);
519                         continue;
520                     }
521                     case NEED_CHUNK:
522                     {
523                         chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT);
524                         continue;
525                     }
526                     case FLUSH:
527                     {
528                         // Don't write the chunk or the content if this is a HEAD response
529                         if (_channel.getRequest().isHead())
530                         {
531                             BufferUtil.clear(chunk);
532                             BufferUtil.clear(_content);
533                         }
534 
535                         // If we have a header
536                         if (BufferUtil.hasContent(_header))
537                         {
538                             if (BufferUtil.hasContent(_content))
539                             {
540                                 if (BufferUtil.hasContent(chunk))
541                                     getEndPoint().write(this, _header, chunk, _content);
542                                 else
543                                     getEndPoint().write(this, _header, _content);
544                             }
545                             else
546                                 getEndPoint().write(this, _header);
547                         }
548                         else if (BufferUtil.hasContent(chunk))
549                         {
550                             if (BufferUtil.hasContent(_content))
551                                 getEndPoint().write(this, chunk, _content);
552                             else
553                                 getEndPoint().write(this, chunk);
554                         }
555                         else if (BufferUtil.hasContent(_content))
556                         {
557                             getEndPoint().write(this, _content);
558                         }
559                         else
560                             continue;
561                         return false;
562                     }
563                     case SHUTDOWN_OUT:
564                     {
565                         getEndPoint().shutdownOutput();
566                         continue;
567                     }
568                     case DONE:
569                     {
570                         if (_header!=null)
571                         {
572                             // don't release header in spare content buffer
573                             if (!_lastContent || _content==null || !_content.hasArray() || !_header.hasArray() ||  _content.array()!=_header.array())
574                                 _bufferPool.release(_header);
575                         }
576                         return true;
577                     }
578                     case CONTINUE:
579                     {
580                         break;
581                     }
582                     default:
583                     {
584                         throw new IllegalStateException("generateResponse="+result);
585                     }
586                 }
587             }
588         }
589     }
590 
591     private class ContentCallback extends IteratingNestedCallback
592     {
593         final ByteBuffer _content;
594         final boolean _lastContent;
595 
596         ContentCallback(ByteBuffer content, boolean last, Callback callback)
597         {
598             super(callback);
599             _content=content;
600             _lastContent=last;
601         }
602 
603         @Override
604         public boolean process() throws Exception
605         {
606             ByteBuffer chunk = _chunk;
607             while (true)
608             {
609                 HttpGenerator.Result result = _generator.generateResponse(null, null, chunk, _content, _lastContent);
610                 if (LOG.isDebugEnabled())
611                     LOG.debug("{} generate: {} ({},{})@{}",
612                         this,
613                         result,
614                         BufferUtil.toSummaryString(_content),
615                         _lastContent,
616                         _generator.getState());
617 
618                 switch (result)
619                 {
620                     case NEED_HEADER:
621                         throw new IllegalStateException();
622                     case NEED_CHUNK:
623                     {
624                         chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT);
625                         continue;
626                     }
627                     case FLUSH:
628                     {
629                         // Don't write the chunk or the content if this is a HEAD response
630                         if (_channel.getRequest().isHead())
631                         {
632                             BufferUtil.clear(chunk);
633                             BufferUtil.clear(_content);
634                             continue;
635                         }
636                         else if (BufferUtil.hasContent(chunk))
637                         {
638                             if (BufferUtil.hasContent(_content))
639                                 getEndPoint().write(this, chunk, _content);
640                             else
641                                 getEndPoint().write(this, chunk);
642                         }
643                         else if (BufferUtil.hasContent(_content))
644                         {
645                             getEndPoint().write(this, _content);
646                         }
647                         else
648                             continue;
649                         return false;
650                     }
651                     case SHUTDOWN_OUT:
652                     {
653                         getEndPoint().shutdownOutput();
654                         continue;
655                     }
656                     case DONE:
657                     {
658                         return true;
659                     }
660                     case CONTINUE:
661                     {
662                         break;
663                     }
664                     default:
665                     {
666                         throw new IllegalStateException("generateResponse="+result);
667                     }
668                 }
669             }
670         }
671     }
672 
673 }