View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.server;
20  
21  import java.io.IOException;
22  import java.net.InetSocketAddress;
23  import java.nio.ByteBuffer;
24  import java.nio.channels.ClosedChannelException;
25  import java.util.List;
26  import java.util.concurrent.TimeoutException;
27  import java.util.concurrent.atomic.AtomicBoolean;
28  import java.util.concurrent.atomic.AtomicInteger;
29  
30  import javax.servlet.DispatcherType;
31  import javax.servlet.RequestDispatcher;
32  import javax.servlet.UnavailableException;
33  import javax.servlet.http.HttpServletRequest;
34  
35  import org.eclipse.jetty.http.BadMessageException;
36  import org.eclipse.jetty.http.HttpFields;
37  import org.eclipse.jetty.http.HttpGenerator;
38  import org.eclipse.jetty.http.HttpHeader;
39  import org.eclipse.jetty.http.HttpHeaderValue;
40  import org.eclipse.jetty.http.HttpStatus;
41  import org.eclipse.jetty.http.HttpVersion;
42  import org.eclipse.jetty.http.MetaData;
43  import org.eclipse.jetty.io.ByteBufferPool;
44  import org.eclipse.jetty.io.ChannelEndPoint;
45  import org.eclipse.jetty.io.EndPoint;
46  import org.eclipse.jetty.io.EofException;
47  import org.eclipse.jetty.server.HttpChannelState.Action;
48  import org.eclipse.jetty.server.handler.ContextHandler;
49  import org.eclipse.jetty.server.handler.ErrorHandler;
50  import org.eclipse.jetty.util.BufferUtil;
51  import org.eclipse.jetty.util.Callback;
52  import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
53  import org.eclipse.jetty.util.log.Log;
54  import org.eclipse.jetty.util.log.Logger;
55  import org.eclipse.jetty.util.thread.Scheduler;
56  
57  
58  /**
59   * HttpChannel represents a single endpoint for HTTP semantic processing.
60   * The HttpChannel is both a HttpParser.RequestHandler, where it passively receives events from
61   * an incoming HTTP request, and a Runnable, where it actively takes control of the request/response
62   * life cycle and calls the application (perhaps suspending and resuming with multiple calls to run).
63   * The HttpChannel signals the switch from passive mode to active mode by returning true to one of the
64   * HttpParser.RequestHandler callbacks.   The completion of the active phase is signalled by a call to
65   * HttpTransport.completed().
66   *
67   */
68  public class HttpChannel implements Runnable, HttpOutput.Interceptor
69  {
70      private static final Logger LOG = Log.getLogger(HttpChannel.class);
71      private final AtomicBoolean _committed = new AtomicBoolean();
72      private final AtomicInteger _requests = new AtomicInteger();
73      private final Connector _connector;
74      private final HttpConfiguration _configuration;
75      private final EndPoint _endPoint;
76      private final HttpTransport _transport;
77      private final HttpChannelState _state;
78      private final Request _request;
79      private final Response _response;
80      private MetaData.Response _committedMetaData;
81      private RequestLog _requestLog;
82  
83      /** Bytes written after interception (eg after compression) */
84      private long _written;
85  
86      public HttpChannel(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransport transport)
87      {
88          _connector = connector;
89          _configuration = configuration;
90          _endPoint = endPoint;
91          _transport = transport;
92  
93          _state = new HttpChannelState(this);
94          _request = new Request(this, newHttpInput(_state));
95          _response = new Response(this, newHttpOutput());
96          _requestLog=_connector==null?null:_connector.getServer().getRequestLog();
97          if (LOG.isDebugEnabled())
98              LOG.debug("new {} -> {},{},{}",this,_endPoint,_endPoint.getConnection(),_state);
99      }
100 
101     protected HttpInput newHttpInput(HttpChannelState state)
102     {
103         return new HttpInput(state);
104     }
105 
106     protected HttpOutput newHttpOutput()
107     {
108         return new HttpOutput(this);
109     }
110 
111     public HttpChannelState getState()
112     {
113         return _state;
114     }
115 
116     public long getBytesWritten()
117     {
118         return _written;
119     }
120 
121     /**
122      * @return the number of requests handled by this connection
123      */
124     public int getRequests()
125     {
126         return _requests.get();
127     }
128 
129     public Connector getConnector()
130     {
131         return _connector;
132     }
133 
134     public HttpTransport getHttpTransport()
135     {
136         return _transport;
137     }
138 
139     public RequestLog getRequestLog()
140     {
141         return _requestLog;
142     }
143 
144     public void setRequestLog(RequestLog requestLog)
145     {
146         _requestLog = requestLog;
147     }
148 
149     public void addRequestLog(RequestLog requestLog)
150     {
151         if (_requestLog==null)
152             _requestLog = requestLog;
153         else if (_requestLog instanceof RequestLogCollection)
154             ((RequestLogCollection) _requestLog).add(requestLog);
155         else
156             _requestLog = new RequestLogCollection(_requestLog, requestLog);
157     }
158 
159     public MetaData.Response getCommittedMetaData()
160     {
161         return _committedMetaData;
162     }
163 
164     /**
165      * Get the idle timeout.
166      * <p>This is implemented as a call to {@link EndPoint#getIdleTimeout()}, but may be
167      * overridden by channels that have timeouts different from their connections.
168      * @return the idle timeout (in milliseconds)
169      */
170     public long getIdleTimeout()
171     {
172         return _endPoint.getIdleTimeout();
173     }
174 
175     /**
176      * Set the idle timeout.
177      * <p>This is implemented as a call to {@link EndPoint#setIdleTimeout(long)}, but may be
178      * overridden by channels that have timeouts different from their connections.
179      * @param timeoutMs the idle timeout in milliseconds
180      */
181     public void setIdleTimeout(long timeoutMs)
182     {
183         _endPoint.setIdleTimeout(timeoutMs);
184     }
185 
186     public ByteBufferPool getByteBufferPool()
187     {
188         return _connector.getByteBufferPool();
189     }
190 
191     public HttpConfiguration getHttpConfiguration()
192     {
193         return _configuration;
194     }
195 
196     @Override
197     public boolean isOptimizedForDirectBuffers()
198     {
199         return getHttpTransport().isOptimizedForDirectBuffers();
200     }
201 
202     public Server getServer()
203     {
204         return _connector.getServer();
205     }
206 
207     public Request getRequest()
208     {
209         return _request;
210     }
211 
212     public Response getResponse()
213     {
214         return _response;
215     }
216 
217     public EndPoint getEndPoint()
218     {
219         return _endPoint;
220     }
221 
222     public InetSocketAddress getLocalAddress()
223     {
224         return _endPoint.getLocalAddress();
225     }
226 
227     public InetSocketAddress getRemoteAddress()
228     {
229         return _endPoint.getRemoteAddress();
230     }
231 
232     /**
233      * If the associated response has the Expect header set to 100 Continue,
234      * then accessing the input stream indicates that the handler/servlet
235      * is ready for the request body and thus a 100 Continue response is sent.
236      *
237      * @param available estimate of the number of bytes that are available
238      * @throws IOException if the InputStream cannot be created
239      */
240     public void continue100(int available) throws IOException
241     {
242         throw new UnsupportedOperationException();
243     }
244 
245     public void recycle()
246     {
247         _committed.set(false);
248         _request.recycle();
249         _response.recycle();
250         _committedMetaData=null;
251         _requestLog=_connector==null?null:_connector.getServer().getRequestLog();
252         _written=0;
253     }
254 
255     public void asyncReadFillInterested()
256     {
257     }
258 
259     @Override
260     public void run()
261     {
262         handle();
263     }
264 
265     /**
266      * @return True if the channel is ready to continue handling (ie it is not suspended)
267      */
268     public boolean handle()
269     {
270         if (LOG.isDebugEnabled())
271             LOG.debug("{} handle {} ", this,_request.getHttpURI());
272 
273         HttpChannelState.Action action = _state.handling();
274 
275         // Loop here to handle async request redispatches.
276         // The loop is controlled by the call to async.unhandle in the
277         // finally block below.  Unhandle will return false only if an async dispatch has
278         // already happened when unhandle is called.
279         loop: while (!getServer().isStopped())
280         {
281             try
282             {
283                 if (LOG.isDebugEnabled())
284                     LOG.debug("{} action {}",this,action);
285 
286                 switch(action)
287                 {
288                     case TERMINATED:
289                     case WAIT:
290                         break loop;
291 
292                     case DISPATCH:
293                     {
294                         if (!_request.hasMetaData())
295                             throw new IllegalStateException("state=" + _state);
296                         _request.setHandled(false);
297                         _response.getHttpOutput().reopen();
298                         _request.setDispatcherType(DispatcherType.REQUEST);
299 
300                         List<HttpConfiguration.Customizer> customizers = _configuration.getCustomizers();
301                         if (!customizers.isEmpty())
302                         {
303                             for (HttpConfiguration.Customizer customizer : customizers)
304                                 customizer.customize(getConnector(), _configuration, _request);
305                         }
306                         getServer().handle(this);
307                         break;
308                     }
309 
310                     case ASYNC_DISPATCH:
311                     {
312                         _request.setHandled(false);
313                         _response.getHttpOutput().reopen();
314                         _request.setDispatcherType(DispatcherType.ASYNC);
315                         getServer().handleAsync(this);
316                         break;
317                     }
318 
319                     case ERROR_DISPATCH:
320                     {
321                         Throwable ex = _state.getAsyncContextEvent().getThrowable();
322 
323                         // Check for error dispatch loops
324                         Integer loop_detect = (Integer)_request.getAttribute("org.eclipse.jetty.server.ERROR_DISPATCH");
325                         if (loop_detect==null)
326                             loop_detect=1;
327                         else
328                             loop_detect=loop_detect+1;
329                         _request.setAttribute("org.eclipse.jetty.server.ERROR_DISPATCH",loop_detect);
330                         if (loop_detect > getHttpConfiguration().getMaxErrorDispatches())
331                         {
332                             LOG.warn("ERROR_DISPATCH loop detected on {} {}",_request,ex);
333                             try
334                             {
335                                 _response.sendError(HttpStatus.INTERNAL_SERVER_ERROR_500);
336                             }
337                             finally
338                             {
339                                 _state.errorComplete();
340                             }
341                             break loop;
342                         }
343 
344                         _request.setHandled(false);
345                         _response.resetBuffer();
346                         _response.getHttpOutput().reopen();
347                         _request.setDispatcherType(DispatcherType.ERROR);
348 
349                         String reason;
350                         if (ex == null || ex instanceof TimeoutException)
351                         {
352                             reason = "Async Timeout";
353                         }
354                         else
355                         {
356                             reason = HttpStatus.Code.INTERNAL_SERVER_ERROR.getMessage();
357                             _request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, ex);
358                         }
359 
360                         _request.setAttribute(RequestDispatcher.ERROR_STATUS_CODE, 500);
361                         _request.setAttribute(RequestDispatcher.ERROR_MESSAGE, reason);
362                         _request.setAttribute(RequestDispatcher.ERROR_REQUEST_URI, _request.getRequestURI());
363 
364                         _response.setStatusWithReason(HttpStatus.INTERNAL_SERVER_ERROR_500, reason);
365 
366                         ErrorHandler eh = ErrorHandler.getErrorHandler(getServer(), _state.getContextHandler());
367                         if (eh instanceof ErrorHandler.ErrorPageMapper)
368                         {
369                             String error_page = ((ErrorHandler.ErrorPageMapper)eh).getErrorPage((HttpServletRequest)_state.getAsyncContextEvent().getSuppliedRequest());
370                             if (error_page != null)
371                                 _state.getAsyncContextEvent().setDispatchPath(error_page);
372                         }
373 
374                         getServer().handleAsync(this);
375                         break;
376                     }
377 
378                     case READ_CALLBACK:
379                     {
380                         ContextHandler handler=_state.getContextHandler();
381                         if (handler!=null)
382                             handler.handle(_request,_request.getHttpInput());
383                         else
384                             _request.getHttpInput().run();
385                         break;
386                     }
387 
388                     case WRITE_CALLBACK:
389                     {
390                         ContextHandler handler=_state.getContextHandler();
391                         if (handler!=null)
392                             handler.handle(_request,_response.getHttpOutput());
393                         else
394                             _response.getHttpOutput().run();
395                         break;
396                     }
397 
398                     case ASYNC_ERROR:
399                     {
400                         _state.onError();
401                         break;
402                     }
403 
404                     case COMPLETE:
405                     {
406                         // TODO do onComplete here for continuations to work
407 //                        _state.onComplete();
408 
409                         if (!_response.isCommitted() && !_request.isHandled())
410                             _response.sendError(404);
411                         else
412                             _response.closeOutput();
413                         _request.setHandled(true);
414 
415                         // TODO do onComplete here to detect errors in final flush
416                          _state.onComplete();
417 
418                         onCompleted();
419 
420                         break loop;
421                     }
422 
423                     default:
424                     {
425                         throw new IllegalStateException("state="+_state);
426                     }
427                 }
428             }
429             catch (EofException|QuietServletException|BadMessageException e)
430             {
431                 if (LOG.isDebugEnabled())
432                     LOG.debug(e);
433                 handleException(e);
434             }
435             catch (Throwable e)
436             {
437                 if ("ContinuationThrowable".equals(e.getClass().getSimpleName()))
438                 {
439                     LOG.ignore(e);
440                 }
441                 else
442                 {
443                     if (_connector.isStarted())
444                         LOG.warn(String.valueOf(_request.getHttpURI()), e);
445                     else
446                         LOG.debug(String.valueOf(_request.getHttpURI()), e);
447                     handleException(e);
448                 }
449             }
450             finally
451             {
452                 _request.setDispatcherType(null);
453             }
454 
455             action = _state.unhandle();
456         }
457 
458         if (LOG.isDebugEnabled())
459             LOG.debug("{} handle exit, result {}", this, action);
460 
461         boolean suspended=action==Action.WAIT;
462         return !suspended;
463     }
464 
465     /**
466      * <p>Sends an error 500, performing a special logic to detect whether the request is suspended,
467      * to avoid concurrent writes from the application.</p>
468      * <p>It may happen that the application suspends, and then throws an exception, while an application
469      * spawned thread writes the response content; in such case, we attempt to commit the error directly
470      * bypassing the {@link ErrorHandler} mechanisms and the response OutputStream.</p>
471      *
472      * @param x the Throwable that caused the problem
473      */
474     protected void handleException(Throwable x)
475     {
476         if (_state.isAsyncStarted())
477         {
478             // Handle exception via AsyncListener onError
479             Throwable root = _state.getAsyncContextEvent().getThrowable();
480             if (root==null)
481             {
482                 _state.error(x);
483             }
484             else
485             {
486                 // TODO Can this happen?  Should this just be ISE???
487                 // We've already processed an error before!
488                 root.addSuppressed(x);
489                 LOG.warn("Error while handling async error: ", root);
490                 abort(x);
491                 _state.errorComplete();
492             }
493         }
494         else
495         {
496             try
497             {
498                 // Handle error normally
499                 _request.setHandled(true);
500                 _request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, x);
501                 _request.setAttribute(RequestDispatcher.ERROR_EXCEPTION_TYPE, x.getClass());
502 
503                 if (isCommitted())
504                 {
505                     abort(x);
506                     if (LOG.isDebugEnabled())
507                         LOG.debug("Could not send response error 500, already committed", x);
508                 }
509                 else
510                 {
511                     _response.setHeader(HttpHeader.CONNECTION.asString(), HttpHeaderValue.CLOSE.asString());
512 
513                     if (x instanceof BadMessageException)
514                     {
515                         BadMessageException bme = (BadMessageException)x;
516                         _response.sendError(bme.getCode(), bme.getReason());
517                     }
518                     else if (x instanceof UnavailableException)
519                     {
520                         if (((UnavailableException)x).isPermanent())
521                             _response.sendError(HttpStatus.NOT_FOUND_404);
522                         else
523                             _response.sendError(HttpStatus.SERVICE_UNAVAILABLE_503);
524                     }
525                     else
526                         _response.sendError(HttpStatus.INTERNAL_SERVER_ERROR_500);
527                 }
528             }
529             catch (Throwable e)
530             {
531                 abort(e);
532                 if (LOG.isDebugEnabled())
533                     LOG.debug("Could not commit response error 500", e);
534             }
535         }
536     }
537 
538     public boolean isExpecting100Continue()
539     {
540         return false;
541     }
542 
543     public boolean isExpecting102Processing()
544     {
545         return false;
546     }
547 
548     @Override
549     public String toString()
550     {
551         return String.format("%s@%x{r=%s,c=%b,a=%s,uri=%s}",
552                 getClass().getSimpleName(),
553                 hashCode(),
554                 _requests,
555                 _committed.get(),
556                 _state.getState(),
557                 _request.getHttpURI());
558     }
559 
560     public void onRequest(MetaData.Request request)
561     {
562         _requests.incrementAndGet();
563         _request.setTimeStamp(System.currentTimeMillis());
564         HttpFields fields = _response.getHttpFields();
565         if (_configuration.getSendDateHeader() && !fields.contains(HttpHeader.DATE))
566             fields.put(_connector.getServer().getDateField());
567 
568         _request.setMetaData(request);
569     }
570 
571     public boolean onContent(HttpInput.Content content)
572     {
573         if (LOG.isDebugEnabled())
574             LOG.debug("{} content {}", this, content);
575 
576         return _request.getHttpInput().addContent(content);
577     }
578 
579     public boolean onRequestComplete()
580     {
581         if (LOG.isDebugEnabled())
582             LOG.debug("{} onRequestComplete", this);
583         return _request.getHttpInput().eof();
584     }
585 
586     public void onCompleted()
587     {
588         if (_requestLog!=null )
589             _requestLog.log(_request, _response);
590 
591         _transport.onCompleted();
592     }
593 
594     public boolean onEarlyEOF()
595     {
596         return _request.getHttpInput().earlyEOF();
597     }
598 
599     public void onBadMessage(int status, String reason)
600     {
601         if (status < 400 || status > 599)
602             status = HttpStatus.BAD_REQUEST_400;
603 
604         try
605         {
606             if (_state.handling()==Action.DISPATCH)
607             {
608                 ByteBuffer content=null;
609                 HttpFields fields=new HttpFields();
610 
611                 ErrorHandler handler=getServer().getBean(ErrorHandler.class);
612                 if (handler!=null)
613                     content=handler.badMessageError(status,reason,fields);
614 
615                 sendResponse(new MetaData.Response(HttpVersion.HTTP_1_1,status,reason,fields,0),content ,true);
616             }
617         }
618         catch (IOException e)
619         {
620             LOG.debug(e);
621         }
622         finally
623         {
624             // TODO: review whether it's the right state to check.
625             if (_state.unhandle()==Action.COMPLETE)
626                 _state.onComplete();
627             else
628                 throw new IllegalStateException(); // TODO: don't throw from finally blocks !
629             onCompleted();
630         }
631     }
632 
633     protected boolean sendResponse(MetaData.Response info, ByteBuffer content, boolean complete, final Callback callback)
634     {
635         boolean committing = _committed.compareAndSet(false, true);
636         if (committing)
637         {
638             // We need an info to commit
639             if (info==null)
640                 info = _response.newResponseMetaData();
641             commit(info);
642 
643             // wrap callback to process 100 responses
644             final int status=info.getStatus();
645             final Callback committed = (status<200&&status>=100)?new Commit100Callback(callback):new CommitCallback(callback);
646 
647             // committing write
648             _transport.send(info, _request.isHead(), content, complete, committed);
649         }
650         else if (info==null)
651         {
652             // This is a normal write
653             _transport.send(null,_request.isHead(), content, complete, callback);
654         }
655         else
656         {
657             callback.failed(new IllegalStateException("committed"));
658         }
659         return committing;
660     }
661 
662     protected boolean sendResponse(MetaData.Response info, ByteBuffer content, boolean complete) throws IOException
663     {
664         try(Blocker blocker = _response.getHttpOutput().acquireWriteBlockingCallback())
665         {
666             boolean committing = sendResponse(info,content,complete,blocker);
667             blocker.block();
668             return committing;
669         }
670         catch (Throwable failure)
671         {
672             if (LOG.isDebugEnabled())
673                 LOG.debug(failure);
674             abort(failure);
675             throw failure;
676         }
677     }
678 
679     protected void commit (MetaData.Response info)
680     {
681         _committedMetaData=info;
682         if (LOG.isDebugEnabled())
683             LOG.debug("Commit {} to {}",info,this);
684     }
685 
686     public boolean isCommitted()
687     {
688         return _committed.get();
689     }
690 
691     /**
692      * <p>Non-Blocking write, committing the response if needed.</p>
693      * Called as last link in HttpOutput.Filter chain
694      * @param content  the content buffer to write
695      * @param complete whether the content is complete for the response
696      * @param callback Callback when complete or failed
697      */
698     @Override
699     public void write(ByteBuffer content, boolean complete, Callback callback)
700     {
701         _written+=BufferUtil.length(content);
702         sendResponse(null,content,complete,callback);
703     }
704 
705     public HttpOutput.Interceptor getNextInterceptor()
706     {
707         return null;
708     }
709 
710     protected void execute(Runnable task)
711     {
712         _connector.getExecutor().execute(task);
713     }
714 
715     public Scheduler getScheduler()
716     {
717         return _connector.getScheduler();
718     }
719 
720     /**
721      * @return true if the HttpChannel can efficiently use direct buffer (typically this means it is not over SSL or a multiplexed protocol)
722      */
723     public boolean useDirectBuffers()
724     {
725         return getEndPoint() instanceof ChannelEndPoint;
726     }
727 
728     /**
729      * If a write or similar operation to this channel fails,
730      * then this method should be called.
731      * <p>
732      * The standard implementation calls {@link HttpTransport#abort(Throwable)}.
733      *
734      * @param failure the failure that caused the abort.
735      */
736     public void abort(Throwable failure)
737     {
738         _transport.abort(failure);
739     }
740 
741     private class CommitCallback implements Callback
742     {
743         private final Callback _callback;
744 
745         private CommitCallback(Callback callback)
746         {
747             _callback = callback;
748         }
749 
750         @Override
751         public boolean isNonBlocking()
752         {
753             return _callback.isNonBlocking();
754         }
755 
756         @Override
757         public void succeeded()
758         {
759             _callback.succeeded();
760         }
761 
762         @Override
763         public void failed(final Throwable x)
764         {
765             if (LOG.isDebugEnabled())
766                 LOG.debug("Commit failed", x);
767 
768             if (x instanceof EofException || x instanceof ClosedChannelException)
769             {
770                 _callback.failed(x);
771                 _response.getHttpOutput().closed();
772             }
773             else
774             {
775                 _transport.send(HttpGenerator.RESPONSE_500_INFO, false, null, true, new Callback()
776                 {
777                     @Override
778                     public void succeeded()
779                     {
780                         _callback.failed(x);
781                         _response.getHttpOutput().closed();
782                     }
783 
784                     @Override
785                     public void failed(Throwable th)
786                     {
787                         _callback.failed(x);
788                         _response.getHttpOutput().closed();
789                     }
790                 });
791             }
792         }
793     }
794 
795     private class Commit100Callback extends CommitCallback
796     {
797         private Commit100Callback(Callback callback)
798         {
799             super(callback);
800         }
801 
802         @Override
803         public void succeeded()
804         {
805             if (_committed.compareAndSet(true, false))
806                 super.succeeded();
807             else
808                 super.failed(new IllegalStateException());
809         }
810 
811     }
812 
813 
814 }