View Javadoc

1   // ========================================================================
2   // Copyright (c) 2007-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.server;
15  
16  import java.util.ArrayList;
17  import java.util.List;
18  
19  import javax.servlet.ServletContext;
20  import javax.servlet.ServletRequest;
21  import javax.servlet.ServletResponse;
22  
23  import org.eclipse.jetty.continuation.Continuation;
24  import org.eclipse.jetty.continuation.ContinuationListener;
25  import org.eclipse.jetty.continuation.ContinuationThrowable;
26  import org.eclipse.jetty.io.AsyncEndPoint;
27  import org.eclipse.jetty.io.EndPoint;
28  import org.eclipse.jetty.server.handler.ContextHandler;
29  import org.eclipse.jetty.server.handler.ContextHandler.Context;
30  import org.eclipse.jetty.util.log.Log;
31  import org.eclipse.jetty.util.log.Logger;
32  import org.eclipse.jetty.util.thread.Timeout;
33  
34  /* ------------------------------------------------------------ */
35  /** Implementation of Continuation and AsyncContext interfaces.
36   * 
37   */
38  public class AsyncContinuation implements AsyncContext, Continuation
39  {
40      private static final Logger LOG = Log.getLogger(AsyncContinuation.class);
41  
42      private final static long DEFAULT_TIMEOUT=30000L;
43      
44      private final static ContinuationThrowable __exception = new ContinuationThrowable();
45      
46      // STATES:
47      //               handling()    suspend()     unhandle()    resume()       complete()  doComplete()
48      //                             startAsync()                dispatch()   
49      // IDLE          DISPATCHED      
50      // DISPATCHED                  ASYNCSTARTED  UNCOMPLETED
51      // ASYNCSTARTED                              ASYNCWAIT     REDISPATCHING  COMPLETING
52      // REDISPATCHING                             REDISPATCHED  
53      // ASYNCWAIT                                               REDISPATCH     COMPLETING
54      // REDISPATCH    REDISPATCHED
55      // REDISPATCHED                ASYNCSTARTED  UNCOMPLETED
56      // COMPLETING    UNCOMPLETED                 UNCOMPLETED
57      // UNCOMPLETED                                                                        COMPLETED
58      // COMPLETED
59      private static final int __IDLE=0;         // Idle request
60      private static final int __DISPATCHED=1;   // Request dispatched to filter/servlet
61      private static final int __ASYNCSTARTED=2; // Suspend called, but not yet returned to container
62      private static final int __REDISPATCHING=3;// resumed while dispatched
63      private static final int __ASYNCWAIT=4;    // Suspended and parked
64      private static final int __REDISPATCH=5;   // Has been scheduled
65      private static final int __REDISPATCHED=6; // Request redispatched to filter/servlet
66      private static final int __COMPLETING=7;   // complete while dispatched
67      private static final int __UNCOMPLETED=8;  // Request is completable
68      private static final int __COMPLETED=9;    // Request is complete
69      
70  
71      /* ------------------------------------------------------------ */
72      protected AbstractHttpConnection _connection;
73      private List<ContinuationListener> _continuationListeners;
74  
75      /* ------------------------------------------------------------ */
76      private int _state;
77      private boolean _initial;
78      private boolean _resumed;
79      private boolean _expired;
80      private volatile boolean _responseWrapped;
81      private long _timeoutMs=DEFAULT_TIMEOUT;
82      private AsyncEventState _event;
83      private volatile long _expireAt;
84      
85      /* ------------------------------------------------------------ */
86      protected AsyncContinuation()
87      {
88          _state=__IDLE;
89          _initial=true;
90      }
91  
92      /* ------------------------------------------------------------ */
93      protected void setConnection(final AbstractHttpConnection connection)
94      {
95          synchronized(this)
96          {
97              _connection=connection;
98          }
99      }
100 
101     /* ------------------------------------------------------------ */
102     public void addContinuationListener(ContinuationListener listener)
103     {
104         synchronized(this)
105         {
106             if (_continuationListeners==null)
107                 _continuationListeners=new ArrayList<ContinuationListener>();
108             _continuationListeners.add(listener);
109         }
110     }
111 
112     /* ------------------------------------------------------------ */
113     public void setTimeout(long ms)
114     {
115         synchronized(this)
116         {
117             _timeoutMs=ms;
118         }
119     } 
120 
121     /* ------------------------------------------------------------ */
122     public long getTimeout()
123     {
124         synchronized(this)
125         {
126             return _timeoutMs;
127         }
128     } 
129 
130     /* ------------------------------------------------------------ */
131     public AsyncEventState getAsyncEventState()
132     {
133         synchronized(this)
134         {
135             return _event;
136         }
137     } 
138    
139     /* ------------------------------------------------------------ */
140     /**
141      * @see org.eclipse.jetty.continuation.Continuation#keepWrappers()
142      */
143 
144     /* ------------------------------------------------------------ */
145     /**
146      * @see org.eclipse.jetty.continuation.Continuation#isResponseWrapped()
147      */
148     public boolean isResponseWrapped()
149     {
150         return _responseWrapped;
151     }
152 
153     /* ------------------------------------------------------------ */
154     /* (non-Javadoc)
155      * @see javax.servlet.ServletRequest#isInitial()
156      */
157     public boolean isInitial()
158     {
159         synchronized(this)
160         {
161             return _initial;
162         }
163     }
164     
165     /* ------------------------------------------------------------ */
166     /* (non-Javadoc)
167      * @see javax.servlet.ServletRequest#isSuspended()
168      */
169     public boolean isSuspended()
170     {
171         synchronized(this)
172         {
173             switch(_state)
174             {
175                 case __ASYNCSTARTED:
176                 case __REDISPATCHING:
177                 case __COMPLETING:
178                 case __ASYNCWAIT:
179                     return true;
180                     
181                 default:
182                     return false;   
183             }
184         }
185     }
186     
187     /* ------------------------------------------------------------ */
188     /* (non-Javadoc)
189      * @see javax.servlet.ServletRequest#isSuspended()
190      */
191     public boolean isSuspending()
192     {
193         synchronized(this)
194         {
195             switch(_state)
196             {
197                 case __ASYNCSTARTED:
198                 case __ASYNCWAIT:
199                     return true;
200                     
201                 default:
202                     return false;   
203             }
204         }
205     }
206 
207     /* ------------------------------------------------------------ */
208     @Override
209     public String toString()
210     {
211         synchronized (this)
212         {
213             return super.toString()+"@"+getStatusString();
214         }
215     }
216 
217     /* ------------------------------------------------------------ */
218     public String getStatusString()
219     {
220         synchronized (this)
221         {
222             return
223             ((_state==__IDLE)?"IDLE":
224                 (_state==__DISPATCHED)?"DISPATCHED":
225                     (_state==__ASYNCSTARTED)?"ASYNCSTARTED":
226                         (_state==__ASYNCWAIT)?"ASYNCWAIT":
227                             (_state==__REDISPATCHING)?"REDISPATCHING":
228                                 (_state==__REDISPATCH)?"REDISPATCH":
229                                     (_state==__REDISPATCHED)?"REDISPATCHED":
230                                         (_state==__COMPLETING)?"COMPLETING":
231                                             (_state==__UNCOMPLETED)?"UNCOMPLETED":
232                                                 (_state==__COMPLETED)?"COMPLETE":
233                                                     ("UNKNOWN?"+_state))+
234             (_initial?",initial":"")+
235             (_resumed?",resumed":"")+
236             (_expired?",expired":"");
237         }
238     }
239 
240     /* ------------------------------------------------------------ */
241     /**
242      * @return false if the handling of the request should not proceed
243      */
244     protected boolean handling()
245     {
246         synchronized (this)
247         {
248             _responseWrapped=false;
249             
250             switch(_state)
251             {
252                 case __IDLE:
253                     _initial=true;
254                     _state=__DISPATCHED;
255                     return true;
256                     
257                 case __COMPLETING:
258                     _state=__UNCOMPLETED;
259                     return false;
260 
261                 case __ASYNCWAIT:
262                     return false;
263                     
264                 case __REDISPATCH:
265                     _state=__REDISPATCHED;
266                     return true;
267 
268                 default:
269                     throw new IllegalStateException(this.getStatusString());
270             }
271         }
272     }
273 
274     /* ------------------------------------------------------------ */
275     /* (non-Javadoc)
276      * @see javax.servlet.ServletRequest#suspend(long)
277      */
278     protected void suspend(final ServletContext context,
279             final ServletRequest request,
280             final ServletResponse response)
281     {
282         synchronized (this)
283         {
284             switch(_state)
285             {
286                 case __DISPATCHED:
287                 case __REDISPATCHED:
288                     _resumed=false;
289                     _expired=false;
290 
291                     if (_event==null || request!=_event.getRequest() || response != _event.getResponse() || context != _event.getServletContext())
292                         _event=new AsyncEventState(context,request,response);
293                     else
294                     {
295                         _event._dispatchContext=null;
296                         _event._path=null;
297                     }
298 
299                     _state=__ASYNCSTARTED;
300                     break;
301 
302                 default:
303                     throw new IllegalStateException(this.getStatusString());
304             }
305         }
306         
307     }
308 
309     /* ------------------------------------------------------------ */
310     /**
311      * Signal that the HttpConnection has finished handling the request.
312      * For blocking connectors, this call may block if the request has
313      * been suspended (startAsync called).
314      * @return true if handling is complete, false if the request should 
315      * be handled again (eg because of a resume that happened before unhandle was called)
316      */
317     protected boolean unhandle()
318     {
319         synchronized (this)
320         {
321             List<ContinuationListener> listeners=_continuationListeners;
322             
323             switch(_state)
324             {
325                 case __REDISPATCHED:
326                 case __DISPATCHED:
327                     _state=__UNCOMPLETED;
328                     return true;
329 
330                 case __IDLE:
331                     throw new IllegalStateException(this.getStatusString());
332 
333                 case __ASYNCSTARTED:
334                     _initial=false;
335                     _state=__ASYNCWAIT;
336                     scheduleTimeout(); // could block and change state.
337                     if (_state==__ASYNCWAIT)
338                         return true;
339                     else if (_state==__COMPLETING)
340                     {
341                         _state=__UNCOMPLETED;
342                         return true;
343                     }
344                     _initial=false;
345                     _state=__REDISPATCHED;
346                     return false; 
347 
348                 case __REDISPATCHING:
349                     _initial=false;
350                     _state=__REDISPATCHED;
351                     return false; 
352 
353                 case __COMPLETING:
354                     _initial=false;
355                     _state=__UNCOMPLETED;
356                     return true;
357 
358                 default:
359                     throw new IllegalStateException(this.getStatusString());
360             }
361         }
362     }
363 
364     /* ------------------------------------------------------------ */
365     public void dispatch()
366     {
367         boolean dispatch=false;
368         synchronized (this)
369         {
370             switch(_state)
371             {
372                 case __ASYNCSTARTED:
373                     _state=__REDISPATCHING;
374                     _resumed=true;
375                     return;
376 
377                 case __ASYNCWAIT:
378                     dispatch=!_expired;
379                     _state=__REDISPATCH;
380                     _resumed=true;
381                     break;
382                     
383                 case __REDISPATCH:
384                     return;
385                     
386                 default:
387                     throw new IllegalStateException(this.getStatusString());
388             }
389         }
390         
391         if (dispatch)
392         {
393             cancelTimeout();
394             scheduleDispatch();
395         }
396     }
397 
398     /* ------------------------------------------------------------ */
399     protected void expired()
400     {
401         final List<ContinuationListener> listeners;
402         synchronized (this)
403         {
404             switch(_state)
405             {
406                 case __ASYNCSTARTED:
407                 case __ASYNCWAIT:
408                     listeners=_continuationListeners;
409                     break;
410                 default:
411                     listeners=null;
412                     return;
413             }
414             _expired=true;
415         }
416         
417         if (listeners!=null)
418         {
419             for (int i=0;i<listeners.size();i++)
420             {
421                 ContinuationListener listener=listeners.get(i);
422                 try
423                 {
424                     listener.onTimeout(this);
425                 }
426                 catch(Exception e)
427                 {
428                     LOG.warn(e);
429                 }
430             }
431         }
432         
433         synchronized (this)
434         {
435             switch(_state)
436             {
437                 case __ASYNCSTARTED:
438                 case __ASYNCWAIT:
439                     dispatch();
440             }
441         }
442 
443         scheduleDispatch();
444     }
445     
446     /* ------------------------------------------------------------ */
447     /* (non-Javadoc)
448      * @see javax.servlet.ServletRequest#complete()
449      */
450     public void complete()
451     {
452         // just like resume, except don't set _resumed=true;
453         boolean dispatch=false;
454         synchronized (this)
455         {
456             switch(_state)
457             {
458                 case __DISPATCHED:
459                 case __REDISPATCHED:
460                     throw new IllegalStateException(this.getStatusString());
461 
462                 case __ASYNCSTARTED:
463                     _state=__COMPLETING;
464                     return;
465                     
466                 case __ASYNCWAIT:
467                     _state=__COMPLETING;
468                     dispatch=!_expired;
469                     break;
470                     
471                 default:
472                     throw new IllegalStateException(this.getStatusString());
473             }
474         }
475         
476         if (dispatch)
477         {
478             cancelTimeout();
479             scheduleDispatch();
480         }
481     }
482 
483     
484     /* ------------------------------------------------------------ */
485     /* (non-Javadoc)
486      * @see javax.servlet.ServletRequest#complete()
487      */
488     protected void doComplete()
489     {
490         final List<ContinuationListener> listeners;
491         synchronized (this)
492         {
493             switch(_state)
494             {
495                 case __UNCOMPLETED:
496                     _state=__COMPLETED;
497                     listeners=_continuationListeners;
498                     break;
499                     
500                 default:
501                     listeners=null;
502                     throw new IllegalStateException(this.getStatusString());
503             }
504         }
505         
506         if (listeners!=null)
507         {
508             for(int i=0;i<listeners.size();i++)
509             {
510                 try
511                 {
512                     listeners.get(i).onComplete(this);
513                 }
514                 catch(Exception e)
515                 {
516                     LOG.warn(e);
517                 }
518             }
519         }
520     }
521 
522     /* ------------------------------------------------------------ */
523     protected void recycle()
524     {
525         synchronized (this)
526         {
527 //            _history.append("r\n");
528             switch(_state)
529             {
530                 case __DISPATCHED:
531                 case __REDISPATCHED:
532                     throw new IllegalStateException(getStatusString());
533                 default:
534                     _state=__IDLE;
535             }
536             _initial = true;
537             _resumed=false;
538             _expired=false;
539             _responseWrapped=false;
540             cancelTimeout();
541             _timeoutMs=DEFAULT_TIMEOUT;
542             _continuationListeners=null;
543         }
544     }    
545     
546     /* ------------------------------------------------------------ */
547     public void cancel()
548     {
549         synchronized (this)
550         {
551             cancelTimeout();
552             _continuationListeners=null;
553         }
554     }
555 
556     /* ------------------------------------------------------------ */
557     protected void scheduleDispatch()
558     {
559         EndPoint endp=_connection.getEndPoint();
560         if (!endp.isBlocking())
561         {
562             ((AsyncEndPoint)endp).asyncDispatch();
563         }
564     }
565 
566     /* ------------------------------------------------------------ */
567     protected void scheduleTimeout()
568     {
569         EndPoint endp=_connection.getEndPoint();
570         if (_timeoutMs>0)
571         {
572             if (endp.isBlocking())
573             {
574                 synchronized(this)
575                 {
576                     _expireAt = System.currentTimeMillis()+_timeoutMs;
577                     long wait=_timeoutMs;
578                     while (_expireAt>0 && wait>0 && _connection.getServer().isRunning())
579                     {
580                         try
581                         {
582                             this.wait(wait);
583                         }
584                         catch (InterruptedException e)
585                         {
586                             LOG.ignore(e);
587                         }
588                         wait=_expireAt-System.currentTimeMillis();
589                     }
590 
591                     if (_expireAt>0 && wait<=0 && _connection.getServer().isRunning())
592                     {
593                         expired();
594                     }
595                 }            
596             }
597             else
598             {
599                 ((AsyncEndPoint)endp).scheduleTimeout(_event,_timeoutMs);
600             }
601         }
602     }
603 
604     /* ------------------------------------------------------------ */
605     protected void cancelTimeout()
606     {
607         EndPoint endp=_connection.getEndPoint();
608         if (endp.isBlocking())
609         {
610             synchronized(this)
611             {
612                 _expireAt=0;
613                 this.notifyAll();
614             }
615         }
616         else 
617         {
618             final AsyncEventState event=_event;
619             if (event!=null)
620             {
621                 ((AsyncEndPoint)endp).cancelTimeout(event);
622             }
623         }
624     }
625 
626     /* ------------------------------------------------------------ */
627     public boolean isCompleting()
628     {
629         synchronized (this)
630         {
631             return _state==__COMPLETING;
632         }
633     }
634     
635     /* ------------------------------------------------------------ */
636     boolean isUncompleted()
637     {
638         synchronized (this)
639         {
640             return _state==__UNCOMPLETED;
641         }
642     } 
643     
644     /* ------------------------------------------------------------ */
645     public boolean isComplete()
646     {
647         synchronized (this)
648         {
649             return _state==__COMPLETED;
650         }
651     }
652 
653 
654     /* ------------------------------------------------------------ */
655     public boolean isAsyncStarted()
656     {
657         synchronized (this)
658         {
659             switch(_state)
660             {
661                 case __ASYNCSTARTED:
662                 case __REDISPATCHING:
663                 case __REDISPATCH:
664                 case __ASYNCWAIT:
665                     return true;
666 
667                 default:
668                     return false;
669             }
670         }
671     }
672 
673 
674     /* ------------------------------------------------------------ */
675     public boolean isAsync()
676     {
677         synchronized (this)
678         {
679             switch(_state)
680             {
681                 case __IDLE:
682                 case __DISPATCHED:
683                 case __UNCOMPLETED:
684                 case __COMPLETED:
685                     return false;
686 
687                 default:
688                     return true;
689             }
690         }
691     }
692 
693     /* ------------------------------------------------------------ */
694     public void dispatch(ServletContext context, String path)
695     {
696         _event._dispatchContext=context;
697         _event._path=path;
698         dispatch();
699     }
700 
701     /* ------------------------------------------------------------ */
702     public void dispatch(String path)
703     {
704         _event._path=path;
705         dispatch();
706     }
707 
708     /* ------------------------------------------------------------ */
709     public Request getBaseRequest()
710     {
711         return _connection.getRequest();
712     }
713     
714     /* ------------------------------------------------------------ */
715     public ServletRequest getRequest()
716     {
717         if (_event!=null)
718             return _event.getRequest();
719         return _connection.getRequest();
720     }
721 
722     /* ------------------------------------------------------------ */
723     public ServletResponse getResponse()
724     {
725         if (_event!=null)
726             return _event.getResponse();
727         return _connection.getResponse();
728     }
729 
730     /* ------------------------------------------------------------ */
731     public void start(final Runnable run)
732     {
733         final AsyncEventState event=_event;
734         if (event!=null)
735         {
736             _connection.getServer().getThreadPool().dispatch(new Runnable()
737             {
738                 public void run()
739                 {
740                     ((Context)event.getServletContext()).getContextHandler().handle(run);
741                 }
742             });
743         }
744     }
745 
746     /* ------------------------------------------------------------ */
747     public boolean hasOriginalRequestAndResponse()
748     {
749         synchronized (this)
750         {
751             return (_event!=null && _event.getRequest()==_connection._request && _event.getResponse()==_connection._response);
752         }
753     }
754 
755     /* ------------------------------------------------------------ */
756     public ContextHandler getContextHandler()
757     {
758         final AsyncEventState event=_event;
759         if (event!=null)
760             return ((Context)event.getServletContext()).getContextHandler();
761         return null;
762     }
763 
764 
765     /* ------------------------------------------------------------ */
766     /**
767      * @see Continuation#isResumed()
768      */
769     public boolean isResumed()
770     {
771         synchronized (this)
772         {
773             return _resumed;
774         }
775     }
776     /* ------------------------------------------------------------ */
777     /**
778      * @see Continuation#isExpired()
779      */
780     public boolean isExpired()
781     {
782         synchronized (this)
783         {
784             return _expired;
785         }
786     }
787 
788     /* ------------------------------------------------------------ */
789     /**
790      * @see Continuation#resume()
791      */
792     public void resume()
793     {
794         dispatch();
795     }
796     
797     /* ------------------------------------------------------------ */
798     /**
799      * @see Continuation#suspend()
800      */
801     public void suspend(ServletResponse response)
802     {
803         _responseWrapped=!(response instanceof Response);
804         AsyncContinuation.this.suspend(_connection.getRequest().getServletContext(),_connection.getRequest(),response); 
805     }
806 
807     /* ------------------------------------------------------------ */
808     /**
809      * @see Continuation#suspend()
810      */
811     public void suspend()
812     {
813         _responseWrapped=false;
814         AsyncContinuation.this.suspend(_connection.getRequest().getServletContext(),_connection.getRequest(),_connection.getResponse());       
815     }
816 
817     /* ------------------------------------------------------------ */
818     /**
819      * @see org.eclipse.jetty.continuation.Continuation#getServletResponse()
820      */
821     public ServletResponse getServletResponse()
822     {
823         if (_responseWrapped && _event!=null && _event.getResponse()!=null)
824             return _event.getResponse();
825         return _connection.getResponse();
826     }
827 
828     /* ------------------------------------------------------------ */
829     /**
830      * @see org.eclipse.jetty.continuation.Continuation#getAttribute(java.lang.String)
831      */
832     public Object getAttribute(String name)
833     {
834         return _connection.getRequest().getAttribute(name);
835     }
836 
837     /* ------------------------------------------------------------ */
838     /**
839      * @see org.eclipse.jetty.continuation.Continuation#removeAttribute(java.lang.String)
840      */
841     public void removeAttribute(String name)
842     {
843         _connection.getRequest().removeAttribute(name);
844     }
845 
846     /* ------------------------------------------------------------ */
847     /**
848      * @see org.eclipse.jetty.continuation.Continuation#setAttribute(java.lang.String, java.lang.Object)
849      */
850     public void setAttribute(String name, Object attribute)
851     {
852         _connection.getRequest().setAttribute(name,attribute);
853     }
854 
855     /* ------------------------------------------------------------ */
856     /**
857      * @see org.eclipse.jetty.continuation.Continuation#undispatch()
858      */
859     public void undispatch()
860     {
861         if (isSuspended())
862         {
863             if (LOG.isDebugEnabled())
864                 throw new ContinuationThrowable();
865             else
866                 throw __exception;
867         }
868         throw new IllegalStateException("!suspended");
869     }
870 
871     /* ------------------------------------------------------------ */
872     /* ------------------------------------------------------------ */
873     public class AsyncEventState extends Timeout.Task implements Runnable
874     {
875         private final ServletContext _suspendedContext;
876         private final ServletRequest _request;
877         private final ServletResponse _response;
878         private ServletContext _dispatchContext;
879         private String _path;
880         
881         public AsyncEventState(ServletContext context, ServletRequest request, ServletResponse response)
882         {
883             _suspendedContext=context;
884             _request=request;
885             _response=response;
886         }
887         
888         public ServletContext getSuspendedContext()
889         {
890             return _suspendedContext;
891         }
892         
893         public ServletContext getDispatchContext()
894         {
895             return _dispatchContext;
896         }
897         
898         public ServletContext getServletContext()
899         {
900             return _dispatchContext==null?_suspendedContext:_dispatchContext;
901         }
902 
903         public ServletRequest getRequest()
904         {
905             return _request;
906         }
907 
908         public ServletResponse getResponse()
909         {
910             return _response;
911         }
912         
913         public String getPath()
914         {
915             return _path;
916         }
917 
918         @Override
919         public void expired()
920         {
921             AsyncContinuation.this.expired();
922         }
923         
924         public void run()
925         {
926             AsyncContinuation.this.expired();  
927         }
928     }
929 }