View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.server;
20  
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  import java.nio.channels.WritePendingException;
24  import java.util.concurrent.RejectedExecutionException;
25  import java.util.concurrent.atomic.AtomicInteger;
26  
27  import org.eclipse.jetty.http.HttpField;
28  import org.eclipse.jetty.http.HttpGenerator;
29  import org.eclipse.jetty.http.HttpHeader;
30  import org.eclipse.jetty.http.HttpHeaderValue;
31  import org.eclipse.jetty.http.HttpParser;
32  import org.eclipse.jetty.http.HttpParser.RequestHandler;
33  import org.eclipse.jetty.http.HttpStatus;
34  import org.eclipse.jetty.http.MetaData;
35  import org.eclipse.jetty.http.PreEncodedHttpField;
36  import org.eclipse.jetty.io.AbstractConnection;
37  import org.eclipse.jetty.io.ByteBufferPool;
38  import org.eclipse.jetty.io.Connection;
39  import org.eclipse.jetty.io.EndPoint;
40  import org.eclipse.jetty.io.EofException;
41  import org.eclipse.jetty.util.BufferUtil;
42  import org.eclipse.jetty.util.Callback;
43  import org.eclipse.jetty.util.IteratingCallback;
44  import org.eclipse.jetty.util.log.Log;
45  import org.eclipse.jetty.util.log.Logger;
46  
47  /**
48   * <p>A {@link Connection} that handles the HTTP protocol.</p>
49   */
50  public class HttpConnection extends AbstractConnection implements Runnable, HttpTransport, Connection.UpgradeFrom
51  {
52      private static final Logger LOG = Log.getLogger(HttpConnection.class);
53      public static final HttpField CONNECTION_CLOSE = new PreEncodedHttpField(HttpHeader.CONNECTION,HttpHeaderValue.CLOSE.asString());
54      public static final String UPGRADE_CONNECTION_ATTRIBUTE = "org.eclipse.jetty.server.HttpConnection.UPGRADE";
55      private static final boolean REQUEST_BUFFER_DIRECT=false;
56      private static final boolean HEADER_BUFFER_DIRECT=false;
57      private static final boolean CHUNK_BUFFER_DIRECT=false;
58      private static final ThreadLocal<HttpConnection> __currentConnection = new ThreadLocal<>();
59  
60      private final HttpConfiguration _config;
61      private final Connector _connector;
62      private final ByteBufferPool _bufferPool;
63      private final HttpInput _input;
64      private final HttpGenerator _generator;
65      private final HttpChannelOverHttp _channel;
66      private final HttpParser _parser;
67      private final AtomicInteger _contentBufferReferences=new AtomicInteger();
68      private volatile ByteBuffer _requestBuffer = null;
69      private volatile ByteBuffer _chunk = null;
70      private final BlockingReadCallback _blockingReadCallback = new BlockingReadCallback();
71      private final AsyncReadCallback _asyncReadCallback = new AsyncReadCallback();
72      private final SendCallback _sendCallback = new SendCallback();
73  
74      /**
75       * Get the current connection that this thread is dispatched to.
76       * Note that a thread may be processing a request asynchronously and
77       * thus not be dispatched to the connection.
78       * @return the current HttpConnection or null
79       * @see Request#getAttribute(String) for a more general way to access the HttpConnection
80       */
81      public static HttpConnection getCurrentConnection()
82      {
83          return __currentConnection.get();
84      }
85  
86      protected static HttpConnection setCurrentConnection(HttpConnection connection)
87      {
88          HttpConnection last=__currentConnection.get();
89          __currentConnection.set(connection);
90          return last;
91      }
92  
93      public HttpConnection(HttpConfiguration config, Connector connector, EndPoint endPoint)
94      {
95          super(endPoint, connector.getExecutor());
96          _config = config;
97          _connector = connector;
98          _bufferPool = _connector.getByteBufferPool();
99          _generator = newHttpGenerator();
100         _channel = newHttpChannel();
101         _input = _channel.getRequest().getHttpInput();
102         _parser = newHttpParser();
103         if (LOG.isDebugEnabled())
104             LOG.debug("New HTTP Connection {}", this);
105     }
106 
107     public HttpConfiguration getHttpConfiguration()
108     {
109         return _config;
110     }
111 
112     protected HttpGenerator newHttpGenerator()
113     {
114         return new HttpGenerator(_config.getSendServerVersion(),_config.getSendXPoweredBy());
115     }
116 
117     protected HttpChannelOverHttp newHttpChannel()
118     {
119         return new HttpChannelOverHttp(this, _connector, _config, getEndPoint(), this);
120     }
121 
122     protected HttpParser newHttpParser()
123     {
124         return new HttpParser(newRequestHandler(), getHttpConfiguration().getRequestHeaderSize());
125     }
126 
127     protected HttpParser.RequestHandler newRequestHandler()
128     {
129         return _channel;
130     }
131 
132     public Server getServer()
133     {
134         return _connector.getServer();
135     }
136 
137     public Connector getConnector()
138     {
139         return _connector;
140     }
141 
142     public HttpChannel getHttpChannel()
143     {
144         return _channel;
145     }
146 
147     public HttpParser getParser()
148     {
149         return _parser;
150     }
151 
152     public HttpGenerator getGenerator()
153     {
154         return _generator;
155     }
156 
157     @Override
158     public boolean isOptimizedForDirectBuffers()
159     {
160         return getEndPoint().isOptimizedForDirectBuffers();
161     }
162 
163     @Override
164     public int getMessagesIn()
165     {
166         return getHttpChannel().getRequests();
167     }
168 
169     @Override
170     public int getMessagesOut()
171     {
172         return getHttpChannel().getRequests();
173     }
174 
175     @Override
176     public ByteBuffer onUpgradeFrom()
177     {
178         if (BufferUtil.hasContent(_requestBuffer))
179         {
180             ByteBuffer buffer = _requestBuffer;
181             _requestBuffer=null;
182             return buffer;
183         }
184         return null;
185     }
186 
187     void releaseRequestBuffer()
188     {
189         if (_requestBuffer != null && !_requestBuffer.hasRemaining())
190         {
191             if (LOG.isDebugEnabled())
192                 LOG.debug("releaseRequestBuffer {}",this);
193             ByteBuffer buffer=_requestBuffer;
194             _requestBuffer=null;
195             _bufferPool.release(buffer);
196         }
197     }
198 
199     public ByteBuffer getRequestBuffer()
200     {
201         if (_requestBuffer == null)
202             _requestBuffer = _bufferPool.acquire(getInputBufferSize(), REQUEST_BUFFER_DIRECT);
203         return _requestBuffer;
204     }
205 
206     public boolean isRequestBufferEmpty()
207     {
208         return BufferUtil.isEmpty(_requestBuffer);
209     }
210 
211     @Override
212     public void onFillable()
213     {
214         if (LOG.isDebugEnabled())
215             LOG.debug("{} onFillable enter {} {}", this, _channel.getState(),BufferUtil.toDetailString(_requestBuffer));
216 
217         HttpConnection last=setCurrentConnection(this);
218         try
219         {
220             while (true)
221             {
222                 // Fill the request buffer (if needed)
223                 int filled = fillRequestBuffer();
224 
225                 // Parse the request buffer
226                 boolean handle = parseRequestBuffer();
227                 // If there was a connection upgrade, the other
228                 // connection took over, nothing more to do here.
229                 if (getEndPoint().getConnection()!=this)
230                     break;
231 
232                 // Handle close parser
233                 if (_parser.isClose() || _parser.isClosed())
234                 {
235                     close();
236                     break;
237                 }
238 
239                 // Handle channel event
240                 if (handle)
241                 {
242                     boolean suspended = !_channel.handle();
243 
244                     // We should break iteration if we have suspended or changed connection or this is not the handling thread.
245                     if (suspended || getEndPoint().getConnection() != this)
246                         break;
247                 }
248 
249                 // Continue or break?
250                 else if (filled<=0)
251                 {
252                     if (filled==0)
253                         fillInterested();
254                     break;
255                 }
256             }
257         }
258         finally
259         {
260             setCurrentConnection(last);
261             if (LOG.isDebugEnabled())
262                 LOG.debug("{} onFillable exit {} {}", this, _channel.getState(),BufferUtil.toDetailString(_requestBuffer));
263         }
264     }
265 
266     /* ------------------------------------------------------------ */
267     /** Fill and parse data looking for content
268      * @return true if an {@link RequestHandler} method was called and it returned true;
269      */
270     protected boolean fillAndParseForContent()
271     {
272         boolean handled=false;
273         while (_parser.inContentState())
274         {
275             int filled = fillRequestBuffer();
276             boolean handle = parseRequestBuffer();
277             handled|=handle;
278             if (handle || filled<=0 || _channel.getRequest().getHttpInput().hasContent())
279                 break;
280         }
281         return handled;
282     }
283 
284     /* ------------------------------------------------------------ */
285     private int fillRequestBuffer()
286     {
287         if (_contentBufferReferences.get()>0)
288         {
289             LOG.warn("{} fill with unconsumed content!",this);
290             return 0;
291         }
292 
293         if (BufferUtil.isEmpty(_requestBuffer))
294         {
295             // Can we fill?
296             if(getEndPoint().isInputShutdown())
297             {
298                 // No pretend we read -1
299                 _parser.atEOF();
300                 if (LOG.isDebugEnabled())
301                     LOG.debug("{} filled -1 {}",this,BufferUtil.toDetailString(_requestBuffer));
302                 return -1;
303             }
304 
305             // Get a buffer
306             // We are not in a race here for the request buffer as we have not yet received a request,
307             // so there are not an possible legal threads calling #parseContent or #completed.
308             _requestBuffer = getRequestBuffer();
309 
310             // fill
311             try
312             {
313                 int filled = getEndPoint().fill(_requestBuffer);
314                 if (filled==0) // Do a retry on fill 0 (optimization for SSL connections)
315                     filled = getEndPoint().fill(_requestBuffer);
316 
317                 // tell parser
318                 if (filled < 0)
319                     _parser.atEOF();
320 
321                 if (LOG.isDebugEnabled())
322                     LOG.debug("{} filled {} {}",this,filled,BufferUtil.toDetailString(_requestBuffer));
323 
324                 return filled;
325             }
326             catch (IOException e)
327             {
328                 LOG.debug(e);
329                 return -1;
330             }
331         }
332         return 0;
333     }
334 
335     /* ------------------------------------------------------------ */
336     private boolean parseRequestBuffer()
337     {
338         if (LOG.isDebugEnabled())
339             LOG.debug("{} parse {} {}",this,BufferUtil.toDetailString(_requestBuffer));
340 
341         boolean handle = _parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer);
342 
343         if (LOG.isDebugEnabled())
344             LOG.debug("{} parsed {} {}",this,handle,_parser);
345 
346         // recycle buffer ?
347         if (_contentBufferReferences.get()==0)
348             releaseRequestBuffer();
349 
350         return handle;
351     }
352 
353     /* ------------------------------------------------------------ */
354     @Override
355     public void onCompleted()
356     {
357         // Handle connection upgrades
358         if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
359         {
360             Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE);
361             if (connection != null)
362             {
363                 if (LOG.isDebugEnabled())
364                     LOG.debug("Upgrade from {} to {}", this, connection);
365                 _channel.getState().upgrade();
366                 getEndPoint().upgrade(connection);
367                 _channel.recycle();
368                 _parser.reset();
369                 _generator.reset();
370                 if (_contentBufferReferences.get()==0)
371                     releaseRequestBuffer();
372                 else
373                 {
374                     LOG.warn("{} lingering content references?!?!",this);
375                     _requestBuffer=null; // Not returned to pool!
376                     _contentBufferReferences.set(0);
377                 }
378                 return;
379             }
380         }
381 
382         // Finish consuming the request
383         // If we are still expecting
384         if (_channel.isExpecting100Continue())
385         {
386             // close to seek EOF
387             _parser.close();
388         }
389         else if (_parser.inContentState() && _generator.isPersistent())
390         {
391             // If we are async, then we have problems to complete neatly
392             if (_channel.getRequest().getHttpInput().isAsync())
393             {
394                 if (LOG.isDebugEnabled())
395                     LOG.debug("unconsumed async input {}", this);
396                 _channel.abort(new IOException("unconsumed input"));
397             }
398             else
399             {
400                 if (LOG.isDebugEnabled())
401                     LOG.debug("unconsumed input {}", this);
402                 // Complete reading the request
403                 if (!_channel.getRequest().getHttpInput().consumeAll())
404                     _channel.abort(new IOException("unconsumed input"));
405             }
406         }
407 
408         // Reset the channel, parsers and generator
409         _channel.recycle();
410         if (_generator.isPersistent() && !_parser.isClosed())
411             _parser.reset();
412         else
413             _parser.close();
414 
415         // Not in a race here with onFillable, because it has given up control before calling handle.
416         // in a slight race with #completed, but not sure what to do with that anyway.
417         if (_chunk!=null)
418             _bufferPool.release(_chunk);
419         _chunk=null;
420         _generator.reset();
421 
422         // if we are not called from the onfillable thread, schedule completion
423         if (getCurrentConnection()!=this)
424         {
425             // If we are looking for the next request
426             if (_parser.isStart())
427             {
428                 // if the buffer is empty
429                 if (BufferUtil.isEmpty(_requestBuffer))
430                 {
431                     // look for more data
432                     fillInterested();
433                 }
434                 // else if we are still running
435                 else if (getConnector().isRunning())
436                 {
437                     // Dispatched to handle a pipelined request
438                     try
439                     {
440                         getExecutor().execute(this);
441                     }
442                     catch (RejectedExecutionException e)
443                     {
444                         if (getConnector().isRunning())
445                             LOG.warn(e);
446                         else
447                             LOG.ignore(e);
448                         getEndPoint().close();
449                     }
450                 }
451                 else
452                 {
453                     getEndPoint().close();
454                 }
455             }
456             // else the parser must be closed, so seek the EOF if we are still open
457             else if (getEndPoint().isOpen())
458                 fillInterested();
459         }
460     }
461 
462     @Override
463     protected void onFillInterestedFailed(Throwable cause)
464     {
465         _parser.close();
466         super.onFillInterestedFailed(cause);
467     }
468 
469     @Override
470     public void onOpen()
471     {
472         super.onOpen();
473         fillInterested();
474     }
475 
476     @Override
477     public void onClose()
478     {
479         _sendCallback.close();
480         super.onClose();
481     }
482 
483     @Override
484     public void run()
485     {
486         onFillable();
487     }
488 
489     @Override
490     public void send(MetaData.Response info, boolean head, ByteBuffer content, boolean lastContent, Callback callback)
491     {
492         if (info == null)
493         {
494             if (!lastContent && BufferUtil.isEmpty(content))
495             {
496                 callback.succeeded();
497                 return;
498             }
499         }
500         else
501         {
502             // If we are still expecting a 100 continues when we commit
503             if (_channel.isExpecting100Continue())
504                 // then we can't be persistent
505                 _generator.setPersistent(false);
506         }
507 
508         if(_sendCallback.reset(info,head,content,lastContent,callback))
509             _sendCallback.iterate();
510     }
511 
512 
513     HttpInput.Content newContent(ByteBuffer c)
514     {
515         return new Content(c);
516     }
517 
518     @Override
519     public void abort(Throwable failure)
520     {
521         // Do a direct close of the output, as this may indicate to a client that the
522         // response is bad either with RST or by abnormal completion of chunked response.
523         getEndPoint().close();
524     }
525 
526     @Override
527     public boolean isPushSupported()
528     {
529         return false;
530     }
531 
532     @Override
533     public void push(org.eclipse.jetty.http.MetaData.Request request)
534     {
535         LOG.debug("ignore push in {}",this);
536     }
537 
538     public void asyncReadFillInterested()
539     {
540         getEndPoint().fillInterested(_asyncReadCallback);
541     }
542 
543     public void blockingReadFillInterested()
544     {
545         getEndPoint().fillInterested(_blockingReadCallback);
546     }
547 
548     public void blockingReadException(Throwable e)
549     {
550         _blockingReadCallback.failed(e);
551     }
552 
553     @Override
554     public String toString()
555     {
556         return String.format("%s[p=%s,g=%s,c=%s]",
557                 super.toString(),
558                 _parser,
559                 _generator,
560                 _channel);
561     }
562 
563     private class Content extends HttpInput.Content
564     {
565         public Content(ByteBuffer content)
566         {
567             super(content);
568             _contentBufferReferences.incrementAndGet();
569         }
570 
571         @Override
572         public void succeeded()
573         {
574             if (_contentBufferReferences.decrementAndGet()==0)
575                 releaseRequestBuffer();
576         }
577 
578         @Override
579         public void failed(Throwable x)
580         {
581             succeeded();
582         }
583     }
584 
585     private class BlockingReadCallback implements Callback
586     {
587         @Override
588         public void succeeded()
589         {
590             _input.unblock();
591         }
592 
593         @Override
594         public void failed(Throwable x)
595         {
596             _input.failed(x);
597         }
598 
599         @Override
600         public boolean isNonBlocking()
601         {
602             // This callback does not block, rather it wakes up the
603             // thread that is blocked waiting on the read.
604             return true;
605         }
606     }
607 
608     private class AsyncReadCallback implements Callback
609     {
610         @Override
611         public void succeeded()
612         {
613             if (fillAndParseForContent())
614                 _channel.handle();
615             else if (!_input.isFinished())
616                 asyncReadFillInterested();
617         }
618 
619         @Override
620         public void failed(Throwable x)
621         {
622             if (_input.failed(x))
623                 _channel.handle();
624         }
625     }
626 
627     private class SendCallback extends IteratingCallback
628     {
629         private MetaData.Response _info;
630         private boolean _head;
631         private ByteBuffer _content;
632         private boolean _lastContent;
633         private Callback _callback;
634         private ByteBuffer _header;
635         private boolean _shutdownOut;
636 
637         private SendCallback()
638         {
639             super(true);
640         }
641 
642         @Override
643         public boolean isNonBlocking()
644         {
645             return _callback.isNonBlocking();
646         }
647 
648         private boolean reset(MetaData.Response info, boolean head, ByteBuffer content, boolean last, Callback callback)
649         {
650             if (reset())
651             {
652                 _info = info;
653                 _head = head;
654                 _content = content;
655                 _lastContent = last;
656                 _callback = callback;
657                 _header = null;
658                 _shutdownOut = false;
659                 return true;
660             }
661 
662             if (isClosed())
663                 callback.failed(new EofException());
664             else
665                 callback.failed(new WritePendingException());
666             return false;
667         }
668 
669         @Override
670         public Action process() throws Exception
671         {
672             if (_callback==null)
673                 throw new IllegalStateException();
674 
675             ByteBuffer chunk = _chunk;
676             while (true)
677             {
678                 HttpGenerator.Result result = _generator.generateResponse(_info, _head, _header, chunk, _content, _lastContent);
679                 if (LOG.isDebugEnabled())
680                     LOG.debug("{} generate: {} ({},{},{})@{}",
681                         this,
682                         result,
683                         BufferUtil.toSummaryString(_header),
684                         BufferUtil.toSummaryString(_content),
685                         _lastContent,
686                         _generator.getState());
687 
688                 switch (result)
689                 {
690                     case NEED_HEADER:
691                     {
692                         _header = _bufferPool.acquire(_config.getResponseHeaderSize(), HEADER_BUFFER_DIRECT);
693 
694                         continue;
695                     }
696                     case NEED_CHUNK:
697                     {
698                         chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT);
699                         continue;
700                     }
701                     case FLUSH:
702                     {
703                         // Don't write the chunk or the content if this is a HEAD response, or any other type of response that should have no content
704                         if (_head || _generator.isNoContent())
705                         {
706                             BufferUtil.clear(chunk);
707                             BufferUtil.clear(_content);
708                         }
709 
710                         // If we have a header
711                         if (BufferUtil.hasContent(_header))
712                         {
713                             if (BufferUtil.hasContent(_content))
714                             {
715                                 if (BufferUtil.hasContent(chunk))
716                                     getEndPoint().write(this, _header, chunk, _content);
717                                 else
718                                     getEndPoint().write(this, _header, _content);
719                             }
720                             else
721                                 getEndPoint().write(this, _header);
722                         }
723                         else if (BufferUtil.hasContent(chunk))
724                         {
725                             if (BufferUtil.hasContent(_content))
726                                 getEndPoint().write(this, chunk, _content);
727                             else
728                                 getEndPoint().write(this, chunk);
729                         }
730                         else if (BufferUtil.hasContent(_content))
731                         {
732                             getEndPoint().write(this, _content);
733                         }
734                         else
735                         {
736                             succeeded(); // nothing to write
737                         }
738                         return Action.SCHEDULED;
739                     }
740                     case SHUTDOWN_OUT:
741                     {
742                         _shutdownOut=true;
743                         continue;
744                     }
745                     case DONE:
746                     {
747                         return Action.SUCCEEDED;
748                     }
749                     case CONTINUE:
750                     {
751                         break;
752                     }
753                     default:
754                     {
755                         throw new IllegalStateException("generateResponse="+result);
756                     }
757                 }
758             }
759         }
760 
761         private void releaseHeader()
762         {
763             ByteBuffer h=_header;
764             _header=null;
765             if (h!=null)
766                 _bufferPool.release(h);
767         }
768 
769         @Override
770         protected void onCompleteSuccess()
771         {
772             releaseHeader();
773             _callback.succeeded();
774             if (_shutdownOut)
775                 getEndPoint().shutdownOutput();
776         }
777 
778         @Override
779         public void onCompleteFailure(final Throwable x)
780         {
781             releaseHeader();
782             failedCallback(_callback,x);
783             if (_shutdownOut)
784                 getEndPoint().shutdownOutput();
785         }
786 
787         @Override
788         public String toString()
789         {
790             return String.format("%s[i=%s,cb=%s]",super.toString(),_info,_callback);
791         }
792     }
793 }