View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.server;
20  
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  import java.nio.channels.WritePendingException;
24  import java.util.concurrent.RejectedExecutionException;
25  import java.util.concurrent.atomic.AtomicInteger;
26  
27  import org.eclipse.jetty.http.HttpField;
28  import org.eclipse.jetty.http.HttpGenerator;
29  import org.eclipse.jetty.http.HttpHeader;
30  import org.eclipse.jetty.http.HttpHeaderValue;
31  import org.eclipse.jetty.http.HttpParser;
32  import org.eclipse.jetty.http.HttpParser.RequestHandler;
33  import org.eclipse.jetty.http.HttpStatus;
34  import org.eclipse.jetty.http.MetaData;
35  import org.eclipse.jetty.http.PreEncodedHttpField;
36  import org.eclipse.jetty.io.AbstractConnection;
37  import org.eclipse.jetty.io.ByteBufferPool;
38  import org.eclipse.jetty.io.Connection;
39  import org.eclipse.jetty.io.EndPoint;
40  import org.eclipse.jetty.io.EofException;
41  import org.eclipse.jetty.util.BufferUtil;
42  import org.eclipse.jetty.util.Callback;
43  import org.eclipse.jetty.util.IteratingCallback;
44  import org.eclipse.jetty.util.log.Log;
45  import org.eclipse.jetty.util.log.Logger;
46  
47  /**
48   * <p>A {@link Connection} that handles the HTTP protocol.</p>
49   */
50  public class HttpConnection extends AbstractConnection implements Runnable, HttpTransport, Connection.UpgradeFrom
51  {
52      private static final Logger LOG = Log.getLogger(HttpConnection.class);
53      public static final HttpField CONNECTION_CLOSE = new PreEncodedHttpField(HttpHeader.CONNECTION,HttpHeaderValue.CLOSE.asString());
54      public static final String UPGRADE_CONNECTION_ATTRIBUTE = "org.eclipse.jetty.server.HttpConnection.UPGRADE";
55      private static final boolean REQUEST_BUFFER_DIRECT=false;
56      private static final boolean HEADER_BUFFER_DIRECT=false;
57      private static final boolean CHUNK_BUFFER_DIRECT=false;
58      private static final ThreadLocal<HttpConnection> __currentConnection = new ThreadLocal<>();
59  
60      private final HttpConfiguration _config;
61      private final Connector _connector;
62      private final ByteBufferPool _bufferPool;
63      private final HttpInput _input;
64      private final HttpGenerator _generator;
65      private final HttpChannelOverHttp _channel;
66      private final HttpParser _parser;
67      private final AtomicInteger _contentBufferReferences=new AtomicInteger();
68      private volatile ByteBuffer _requestBuffer = null;
69      private volatile ByteBuffer _chunk = null;
70      private final BlockingReadCallback _blockingReadCallback = new BlockingReadCallback();
71      private final AsyncReadCallback _asyncReadCallback = new AsyncReadCallback();
72      private final SendCallback _sendCallback = new SendCallback();
73  
74      /**
75       * Get the current connection that this thread is dispatched to.
76       * Note that a thread may be processing a request asynchronously and
77       * thus not be dispatched to the connection.
78       * @return the current HttpConnection or null
79       * @see Request#getAttribute(String) for a more general way to access the HttpConnection
80       */
81      public static HttpConnection getCurrentConnection()
82      {
83          return __currentConnection.get();
84      }
85  
86      protected static HttpConnection setCurrentConnection(HttpConnection connection)
87      {
88          HttpConnection last=__currentConnection.get();
89          __currentConnection.set(connection);
90          return last;
91      }
92  
93      public HttpConnection(HttpConfiguration config, Connector connector, EndPoint endPoint)
94      {
95          super(endPoint, connector.getExecutor());
96          _config = config;
97          _connector = connector;
98          _bufferPool = _connector.getByteBufferPool();
99          _generator = newHttpGenerator();
100         _channel = newHttpChannel();
101         _input = _channel.getRequest().getHttpInput();
102         _parser = newHttpParser();
103         if (LOG.isDebugEnabled())
104             LOG.debug("New HTTP Connection {}", this);
105     }
106 
107     public HttpConfiguration getHttpConfiguration()
108     {
109         return _config;
110     }
111 
112     protected HttpGenerator newHttpGenerator()
113     {
114         return new HttpGenerator(_config.getSendServerVersion(),_config.getSendXPoweredBy());
115     }
116 
117     protected HttpChannelOverHttp newHttpChannel()
118     {
119         return new HttpChannelOverHttp(this, _connector, _config, getEndPoint(), this);
120     }
121 
122     protected HttpParser newHttpParser()
123     {
124         return new HttpParser(newRequestHandler(), getHttpConfiguration().getRequestHeaderSize());
125     }
126 
127     protected HttpParser.RequestHandler newRequestHandler()
128     {
129         return _channel;
130     }
131 
132     public Server getServer()
133     {
134         return _connector.getServer();
135     }
136 
137     public Connector getConnector()
138     {
139         return _connector;
140     }
141 
142     public HttpChannel getHttpChannel()
143     {
144         return _channel;
145     }
146 
147     public HttpParser getParser()
148     {
149         return _parser;
150     }
151 
152     public HttpGenerator getGenerator()
153     {
154         return _generator;
155     }
156 
157     @Override
158     public boolean isOptimizedForDirectBuffers()
159     {
160         return getEndPoint().isOptimizedForDirectBuffers();
161     }
162 
163     @Override
164     public int getMessagesIn()
165     {
166         return getHttpChannel().getRequests();
167     }
168 
169     @Override
170     public int getMessagesOut()
171     {
172         return getHttpChannel().getRequests();
173     }
174 
175     @Override
176     public ByteBuffer onUpgradeFrom()
177     {
178         if (BufferUtil.hasContent(_requestBuffer))
179         {
180             ByteBuffer buffer = _requestBuffer;
181             _requestBuffer=null;
182             return buffer;
183         }
184         return null;
185     }
186 
187     void releaseRequestBuffer()
188     {
189         if (_requestBuffer != null && !_requestBuffer.hasRemaining())
190         {
191             if (LOG.isDebugEnabled())
192                 LOG.debug("releaseRequestBuffer {}",this);
193             ByteBuffer buffer=_requestBuffer;
194             _requestBuffer=null;
195             _bufferPool.release(buffer);
196         }
197     }
198 
199     public ByteBuffer getRequestBuffer()
200     {
201         if (_requestBuffer == null)
202             _requestBuffer = _bufferPool.acquire(getInputBufferSize(), REQUEST_BUFFER_DIRECT);
203         return _requestBuffer;
204     }
205 
206     public boolean isRequestBufferEmpty()
207     {
208         return BufferUtil.isEmpty(_requestBuffer);
209     }
210 
211     @Override
212     public void onFillable()
213     {
214         if (LOG.isDebugEnabled())
215             LOG.debug("{} onFillable enter {}", this, _channel.getState());
216 
217         HttpConnection last=setCurrentConnection(this);
218         try
219         {
220             while (true)
221             {
222                 // Fill the request buffer (if needed)
223                 int filled = fillRequestBuffer();
224 
225                 // Parse the request buffer
226                 boolean handle = parseRequestBuffer();
227                 // If there was a connection upgrade, the other
228                 // connection took over, nothing more to do here.
229                 if (getEndPoint().getConnection()!=this)
230                     break;
231 
232                 // Handle close parser
233                 if (_parser.isClose() || _parser.isClosed())
234                 {
235                     close();
236                     break;
237                 }
238 
239                 // Handle channel event
240                 if (handle)
241                 {
242                     boolean suspended = !_channel.handle();
243 
244                     // We should break iteration if we have suspended or changed connection or this is not the handling thread.
245                     if (suspended || getEndPoint().getConnection() != this)
246                         break;
247                 }
248 
249                 // Continue or break?
250                 else if (filled<=0)
251                 {
252                     if (filled==0)
253                         fillInterested();
254                     break;
255                 }
256             }
257         }
258         finally
259         {
260             setCurrentConnection(last);
261             if (LOG.isDebugEnabled())
262                 LOG.debug("{} onFillable exit {}", this, _channel.getState());
263         }
264     }
265 
266     /* ------------------------------------------------------------ */
267     /** Fill and parse data looking for content
268      * @return true if an {@link RequestHandler} method was called and it returned true;
269      */
270     protected boolean fillAndParseForContent()
271     {
272         boolean handled=false;
273         while (_parser.inContentState())
274         {
275             if (LOG.isDebugEnabled())
276                 LOG.debug("{} parseContent",this);
277             int filled = fillRequestBuffer();
278             boolean handle = parseRequestBuffer();
279             handled|=handle;
280             if (handle || filled<=0 || _channel.getRequest().getHttpInput().hasContent())
281                 break;
282         }
283         return handled;
284     }
285 
286     /* ------------------------------------------------------------ */
287     private int fillRequestBuffer()
288     {
289         if (_contentBufferReferences.get()>0)
290         {
291             LOG.warn("{} fill with unconsumed content!",this);
292             return 0;
293         }
294 
295         if (BufferUtil.isEmpty(_requestBuffer))
296         {
297             // Can we fill?
298             if(getEndPoint().isInputShutdown())
299             {
300                 // No pretend we read -1
301                 _parser.atEOF();
302                 if (LOG.isDebugEnabled())
303                     LOG.debug("{} filled -1",this);
304                 return -1;
305             }
306 
307             // Get a buffer
308             // We are not in a race here for the request buffer as we have not yet received a request,
309             // so there are not an possible legal threads calling #parseContent or #completed.
310             _requestBuffer = getRequestBuffer();
311 
312             // fill
313             try
314             {
315                 int filled = getEndPoint().fill(_requestBuffer);
316                 if (filled==0) // Do a retry on fill 0 (optimization for SSL connections)
317                     filled = getEndPoint().fill(_requestBuffer);
318 
319                 // tell parser
320                 if (filled < 0)
321                     _parser.atEOF();
322 
323                 if (LOG.isDebugEnabled())
324                     LOG.debug("{} filled {}",this,filled);
325 
326                 return filled;
327             }
328             catch (IOException e)
329             {
330                 LOG.debug(e);
331                 return -1;
332             }
333         }
334         return 0;
335     }
336 
337     /* ------------------------------------------------------------ */
338     private boolean parseRequestBuffer()
339     {
340         if (LOG.isDebugEnabled())
341             LOG.debug("{} parse {} {}",this,BufferUtil.toDetailString(_requestBuffer));
342 
343         boolean handle = _parser.parseNext(_requestBuffer==null?BufferUtil.EMPTY_BUFFER:_requestBuffer);
344 
345         if (LOG.isDebugEnabled())
346             LOG.debug("{} parsed {} {}",this,handle,_parser);
347 
348         // recycle buffer ?
349         if (_contentBufferReferences.get()==0)
350             releaseRequestBuffer();
351 
352         return handle;
353     }
354 
355     /* ------------------------------------------------------------ */
356     @Override
357     public void onCompleted()
358     {
359         // Handle connection upgrades
360         if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
361         {
362             Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE);
363             if (connection != null)
364             {
365                 if (LOG.isDebugEnabled())
366                     LOG.debug("Upgrade from {} to {}", this, connection);
367                 _channel.getState().upgrade();
368                 getEndPoint().upgrade(connection);
369                 _channel.recycle();
370                 _parser.reset();
371                 _generator.reset();
372                 if (_contentBufferReferences.get()==0)
373                     releaseRequestBuffer();
374                 else
375                 {
376                     LOG.warn("{} lingering content references?!?!",this);
377                     _requestBuffer=null; // Not returned to pool!
378                     _contentBufferReferences.set(0);
379                 }
380                 return;
381             }
382         }
383 
384         // Finish consuming the request
385         // If we are still expecting
386         if (_channel.isExpecting100Continue())
387         {
388             // close to seek EOF
389             _parser.close();
390         }
391         else if (_parser.inContentState() && _generator.isPersistent())
392         {
393             // If we are async, then we have problems to complete neatly
394             if (_channel.getRequest().getHttpInput().isAsync())
395             {
396                 if (LOG.isDebugEnabled())
397                     LOG.debug("unconsumed async input {}", this);
398                 _channel.abort(new IOException("unconsumed input"));
399             }
400             else
401             {
402                 if (LOG.isDebugEnabled())
403                     LOG.debug("unconsumed input {}", this);
404                 // Complete reading the request
405                 if (!_channel.getRequest().getHttpInput().consumeAll())
406                     _channel.abort(new IOException("unconsumed input"));
407             }
408         }
409 
410         // Reset the channel, parsers and generator
411         _channel.recycle();
412         if (_generator.isPersistent() && !_parser.isClosed())
413             _parser.reset();
414         else
415             _parser.close();
416 
417         // Not in a race here with onFillable, because it has given up control before calling handle.
418         // in a slight race with #completed, but not sure what to do with that anyway.
419         if (_chunk!=null)
420             _bufferPool.release(_chunk);
421         _chunk=null;
422         _generator.reset();
423 
424         // if we are not called from the onfillable thread, schedule completion
425         if (getCurrentConnection()!=this)
426         {
427             // If we are looking for the next request
428             if (_parser.isStart())
429             {
430                 // if the buffer is empty
431                 if (BufferUtil.isEmpty(_requestBuffer))
432                 {
433                     // look for more data
434                     fillInterested();
435                 }
436                 // else if we are still running
437                 else if (getConnector().isRunning())
438                 {
439                     // Dispatched to handle a pipelined request
440                     try
441                     {
442                         getExecutor().execute(this);
443                     }
444                     catch (RejectedExecutionException e)
445                     {
446                         if (getConnector().isRunning())
447                             LOG.warn(e);
448                         else
449                             LOG.ignore(e);
450                         getEndPoint().close();
451                     }
452                 }
453                 else
454                 {
455                     getEndPoint().close();
456                 }
457             }
458             // else the parser must be closed, so seek the EOF if we are still open
459             else if (getEndPoint().isOpen())
460                 fillInterested();
461         }
462     }
463 
464     @Override
465     protected void onFillInterestedFailed(Throwable cause)
466     {
467         _parser.close();
468         super.onFillInterestedFailed(cause);
469     }
470 
471     @Override
472     public void onOpen()
473     {
474         super.onOpen();
475         fillInterested();
476     }
477 
478     @Override
479     public void onClose()
480     {
481         _sendCallback.close();
482         super.onClose();
483     }
484 
485     @Override
486     public void run()
487     {
488         onFillable();
489     }
490 
491     @Override
492     public void send(MetaData.Response info, boolean head, ByteBuffer content, boolean lastContent, Callback callback)
493     {
494         if (info == null)
495         {
496             if (!lastContent && BufferUtil.isEmpty(content))
497             {
498                 callback.succeeded();
499                 return;
500             }
501         }
502         else
503         {
504             // If we are still expecting a 100 continues when we commit
505             if (_channel.isExpecting100Continue())
506                 // then we can't be persistent
507                 _generator.setPersistent(false);
508         }
509 
510         if(_sendCallback.reset(info,head,content,lastContent,callback))
511             _sendCallback.iterate();
512     }
513 
514 
515     HttpInput.Content newContent(ByteBuffer c)
516     {
517         return new Content(c);
518     }
519 
520     @Override
521     public void abort(Throwable failure)
522     {
523         // Do a direct close of the output, as this may indicate to a client that the
524         // response is bad either with RST or by abnormal completion of chunked response.
525         getEndPoint().close();
526     }
527 
528     @Override
529     public boolean isPushSupported()
530     {
531         return false;
532     }
533 
534     @Override
535     public void push(org.eclipse.jetty.http.MetaData.Request request)
536     {
537         LOG.debug("ignore push in {}",this);
538     }
539 
540     public void asyncReadFillInterested()
541     {
542         getEndPoint().fillInterested(_asyncReadCallback);
543     }
544 
545     public void blockingReadFillInterested()
546     {
547         getEndPoint().fillInterested(_blockingReadCallback);
548     }
549 
550     public void blockingReadException(Throwable e)
551     {
552         _blockingReadCallback.failed(e);
553     }
554 
555     @Override
556     public String toString()
557     {
558         return String.format("%s[p=%s,g=%s,c=%s][b=%s]",
559                 super.toString(),
560                 _parser,
561                 _generator,
562                 _channel,
563                 BufferUtil.toDetailString(_requestBuffer));
564     }
565 
566     private class Content extends HttpInput.Content
567     {
568         public Content(ByteBuffer content)
569         {
570             super(content);
571             _contentBufferReferences.incrementAndGet();
572         }
573 
574         @Override
575         public void succeeded()
576         {
577             if (_contentBufferReferences.decrementAndGet()==0)
578                 releaseRequestBuffer();
579         }
580 
581         @Override
582         public void failed(Throwable x)
583         {
584             succeeded();
585         }
586     }
587 
588     private class BlockingReadCallback implements Callback
589     {
590         @Override
591         public void succeeded()
592         {
593             _input.unblock();
594         }
595 
596         @Override
597         public void failed(Throwable x)
598         {
599             _input.failed(x);
600         }
601 
602         @Override
603         public boolean isNonBlocking()
604         {
605             // This callback does not block, rather it wakes up the
606             // thread that is blocked waiting on the read.
607             return true;
608         }
609     }
610 
611     private class AsyncReadCallback implements Callback
612     {
613         @Override
614         public void succeeded()
615         {
616             if (fillAndParseForContent())
617                 _channel.handle();
618             else if (!_input.isFinished())
619                 asyncReadFillInterested();
620         }
621 
622         @Override
623         public void failed(Throwable x)
624         {
625             if (_input.failed(x))
626                 _channel.handle();
627         }
628     }
629 
630     private class SendCallback extends IteratingCallback
631     {
632         private MetaData.Response _info;
633         private boolean _head;
634         private ByteBuffer _content;
635         private boolean _lastContent;
636         private Callback _callback;
637         private ByteBuffer _header;
638         private boolean _shutdownOut;
639 
640         private SendCallback()
641         {
642             super(true);
643         }
644 
645         @Override
646         public boolean isNonBlocking()
647         {
648             return _callback.isNonBlocking();
649         }
650 
651         private boolean reset(MetaData.Response info, boolean head, ByteBuffer content, boolean last, Callback callback)
652         {
653             if (reset())
654             {
655                 _info = info;
656                 _head = head;
657                 _content = content;
658                 _lastContent = last;
659                 _callback = callback;
660                 _header = null;
661                 _shutdownOut = false;
662                 return true;
663             }
664 
665             if (isClosed())
666                 callback.failed(new EofException());
667             else
668                 callback.failed(new WritePendingException());
669             return false;
670         }
671 
672         @Override
673         public Action process() throws Exception
674         {
675             if (_callback==null)
676                 throw new IllegalStateException();
677 
678             ByteBuffer chunk = _chunk;
679             while (true)
680             {
681                 HttpGenerator.Result result = _generator.generateResponse(_info, _head, _header, chunk, _content, _lastContent);
682                 if (LOG.isDebugEnabled())
683                     LOG.debug("{} generate: {} ({},{},{})@{}",
684                         this,
685                         result,
686                         BufferUtil.toSummaryString(_header),
687                         BufferUtil.toSummaryString(_content),
688                         _lastContent,
689                         _generator.getState());
690 
691                 switch (result)
692                 {
693                     case NEED_HEADER:
694                     {
695                         // Look for optimisation to avoid allocating a _header buffer
696                         /*
697                          Cannot use this optimisation unless we work out how not to overwrite data in user passed arrays.
698                         if (_lastContent && _content!=null && !_content.isReadOnly() && _content.hasArray() && BufferUtil.space(_content)>_config.getResponseHeaderSize() )
699                         {
700                             // use spare space in content buffer for header buffer
701                             int p=_content.position();
702                             int l=_content.limit();
703                             _content.position(l);
704                             _content.limit(l+_config.getResponseHeaderSize());
705                             _header=_content.slice();
706                             _header.limit(0);
707                             _content.position(p);
708                             _content.limit(l);
709                         }
710                         else
711                         */
712                             _header = _bufferPool.acquire(_config.getResponseHeaderSize(), HEADER_BUFFER_DIRECT);
713 
714                         continue;
715                     }
716                     case NEED_CHUNK:
717                     {
718                         chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, CHUNK_BUFFER_DIRECT);
719                         continue;
720                     }
721                     case FLUSH:
722                     {
723                         // Don't write the chunk or the content if this is a HEAD response, or any other type of response that should have no content
724                         if (_head || _generator.isNoContent())
725                         {
726                             BufferUtil.clear(chunk);
727                             BufferUtil.clear(_content);
728                         }
729 
730                         // If we have a header
731                         if (BufferUtil.hasContent(_header))
732                         {
733                             if (BufferUtil.hasContent(_content))
734                             {
735                                 if (BufferUtil.hasContent(chunk))
736                                     getEndPoint().write(this, _header, chunk, _content);
737                                 else
738                                     getEndPoint().write(this, _header, _content);
739                             }
740                             else
741                                 getEndPoint().write(this, _header);
742                         }
743                         else if (BufferUtil.hasContent(chunk))
744                         {
745                             if (BufferUtil.hasContent(_content))
746                                 getEndPoint().write(this, chunk, _content);
747                             else
748                                 getEndPoint().write(this, chunk);
749                         }
750                         else if (BufferUtil.hasContent(_content))
751                         {
752                             getEndPoint().write(this, _content);
753                         }
754                         else
755                         {
756                             succeeded(); // nothing to write
757                         }
758                         return Action.SCHEDULED;
759                     }
760                     case SHUTDOWN_OUT:
761                     {
762                         _shutdownOut=true;
763                         continue;
764                     }
765                     case DONE:
766                     {
767                         return Action.SUCCEEDED;
768                     }
769                     case CONTINUE:
770                     {
771                         break;
772                     }
773                     default:
774                     {
775                         throw new IllegalStateException("generateResponse="+result);
776                     }
777                 }
778             }
779         }
780 
781         private void releaseHeader()
782         {
783             ByteBuffer h=_header;
784             _header=null;
785             if (h!=null)
786                 _bufferPool.release(h);
787         }
788 
789         @Override
790         protected void onCompleteSuccess()
791         {
792             releaseHeader();
793             _callback.succeeded();
794             if (_shutdownOut)
795                 getEndPoint().shutdownOutput();
796         }
797 
798         @Override
799         public void onCompleteFailure(final Throwable x)
800         {
801             releaseHeader();
802             failedCallback(_callback,x);
803             if (_shutdownOut)
804                 getEndPoint().shutdownOutput();
805         }
806 
807         @Override
808         public String toString()
809         {
810             return String.format("%s[i=%s,cb=%s]",super.toString(),_info,_callback);
811         }
812     }
813 }