View Javadoc

1   // ========================================================================
2   // Copyright (c) 2007-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.server;
15  
16  import java.util.ArrayList;
17  import java.util.List;
18  
19  import javax.servlet.ServletContext;
20  import javax.servlet.ServletRequest;
21  import javax.servlet.ServletResponse;
22  
23  import org.eclipse.jetty.continuation.Continuation;
24  import org.eclipse.jetty.continuation.ContinuationListener;
25  import org.eclipse.jetty.continuation.ContinuationThrowable;
26  import org.eclipse.jetty.io.AsyncEndPoint;
27  import org.eclipse.jetty.io.EndPoint;
28  import org.eclipse.jetty.server.handler.ContextHandler;
29  import org.eclipse.jetty.server.handler.ContextHandler.Context;
30  import org.eclipse.jetty.util.log.Log;
31  import org.eclipse.jetty.util.thread.Timeout;
32  
33  /* ------------------------------------------------------------ */
34  /** Implementation of Continuation and AsyncContext interfaces.
35   * 
36   */
37  public class AsyncContinuation implements AsyncContext, Continuation
38  {
39      private final static long DEFAULT_TIMEOUT=30000L;
40      
41      private final static ContinuationThrowable __exception = new ContinuationThrowable();
42      
43      // STATES:
44      //               handling()    suspend()     unhandle()    resume()       complete()  doComplete()
45      //                             startAsync()                dispatch()   
46      // IDLE          DISPATCHED      
47      // DISPATCHED                  ASYNCSTARTED  UNCOMPLETED
48      // ASYNCSTARTED                              ASYNCWAIT     REDISPATCHING  COMPLETING
49      // REDISPATCHING                             REDISPATCHED  
50      // ASYNCWAIT                                               REDISPATCH     COMPLETING
51      // REDISPATCH    REDISPATCHED
52      // REDISPATCHED                ASYNCSTARTED  UNCOMPLETED
53      // COMPLETING    UNCOMPLETED                 UNCOMPLETED
54      // UNCOMPLETED                                                                        COMPLETED
55      // COMPLETED
56      private static final int __IDLE=0;         // Idle request
57      private static final int __DISPATCHED=1;   // Request dispatched to filter/servlet
58      private static final int __ASYNCSTARTED=2; // Suspend called, but not yet returned to container
59      private static final int __REDISPATCHING=3;// resumed while dispatched
60      private static final int __ASYNCWAIT=4;    // Suspended and parked
61      private static final int __REDISPATCH=5;   // Has been scheduled
62      private static final int __REDISPATCHED=6; // Request redispatched to filter/servlet
63      private static final int __COMPLETING=7;   // complete while dispatched
64      private static final int __UNCOMPLETED=8;  // Request is completable
65      private static final int __COMPLETED=9;    // Request is complete
66      
67  
68      /* ------------------------------------------------------------ */
69      protected HttpConnection _connection;
70      private List<ContinuationListener> _continuationListeners;
71  
72      /* ------------------------------------------------------------ */
73      private int _state;
74      private boolean _initial;
75      private boolean _resumed;
76      private boolean _expired;
77      private volatile boolean _responseWrapped;
78      private long _timeoutMs=DEFAULT_TIMEOUT;
79      private AsyncEventState _event;
80      private volatile long _expireAt;
81      
82      /* ------------------------------------------------------------ */
83      protected AsyncContinuation()
84      {
85          _state=__IDLE;
86          _initial=true;
87      }
88  
89      /* ------------------------------------------------------------ */
90      protected void setConnection(final HttpConnection connection)
91      {
92          synchronized(this)
93          {
94              _connection=connection;
95          }
96      }
97  
98      /* ------------------------------------------------------------ */
99      public void addContinuationListener(ContinuationListener listener)
100     {
101         synchronized(this)
102         {
103             if (_continuationListeners==null)
104                 _continuationListeners=new ArrayList<ContinuationListener>();
105             _continuationListeners.add(listener);
106         }
107     }
108 
109     /* ------------------------------------------------------------ */
110     public void setTimeout(long ms)
111     {
112         synchronized(this)
113         {
114             _timeoutMs=ms;
115         }
116     } 
117 
118     /* ------------------------------------------------------------ */
119     public long getTimeout()
120     {
121         synchronized(this)
122         {
123             return _timeoutMs;
124         }
125     } 
126 
127     /* ------------------------------------------------------------ */
128     public AsyncEventState getAsyncEventState()
129     {
130         synchronized(this)
131         {
132             return _event;
133         }
134     } 
135    
136     /* ------------------------------------------------------------ */
137     /**
138      * @see org.eclipse.jetty.continuation.Continuation#keepWrappers()
139      */
140 
141     /* ------------------------------------------------------------ */
142     /**
143      * @see org.eclipse.jetty.continuation.Continuation#isResponseWrapped()
144      */
145     public boolean isResponseWrapped()
146     {
147         return _responseWrapped;
148     }
149 
150     /* ------------------------------------------------------------ */
151     /* (non-Javadoc)
152      * @see javax.servlet.ServletRequest#isInitial()
153      */
154     public boolean isInitial()
155     {
156         synchronized(this)
157         {
158             return _initial;
159         }
160     }
161     
162     /* ------------------------------------------------------------ */
163     /* (non-Javadoc)
164      * @see javax.servlet.ServletRequest#isSuspended()
165      */
166     public boolean isSuspended()
167     {
168         synchronized(this)
169         {
170             switch(_state)
171             {
172                 case __ASYNCSTARTED:
173                 case __REDISPATCHING:
174                 case __COMPLETING:
175                 case __ASYNCWAIT:
176                     return true;
177                     
178                 default:
179                     return false;   
180             }
181         }
182     }
183 
184     /* ------------------------------------------------------------ */
185     @Override
186     public String toString()
187     {
188         synchronized (this)
189         {
190             return super.toString()+"@"+getStatusString();
191         }
192     }
193 
194     /* ------------------------------------------------------------ */
195     public String getStatusString()
196     {
197         synchronized (this)
198         {
199             return
200             ((_state==__IDLE)?"IDLE":
201                 (_state==__DISPATCHED)?"DISPATCHED":
202                     (_state==__ASYNCSTARTED)?"ASYNCSTARTED":
203                         (_state==__ASYNCWAIT)?"ASYNCWAIT":
204                             (_state==__REDISPATCHING)?"REDISPATCHING":
205                                 (_state==__REDISPATCH)?"REDISPATCH":
206                                     (_state==__REDISPATCHED)?"REDISPATCHED":
207                                         (_state==__COMPLETING)?"COMPLETING":
208                                             (_state==__UNCOMPLETED)?"UNCOMPLETED":
209                                                 (_state==__COMPLETED)?"COMPLETE":
210                                                     ("UNKNOWN?"+_state))+
211             (_initial?",initial":"")+
212             (_resumed?",resumed":"")+
213             (_expired?",expired":"");
214         }
215     }
216 
217     /* ------------------------------------------------------------ */
218     /**
219      * @return false if the handling of the request should not proceed
220      */
221     protected boolean handling()
222     {
223         synchronized (this)
224         {
225             _responseWrapped=false;
226             
227             switch(_state)
228             {
229                 case __IDLE:
230                     _initial=true;
231                     _state=__DISPATCHED;
232                     return true;
233                     
234                 case __COMPLETING:
235                     _state=__UNCOMPLETED;
236                     return false;
237 
238                 case __ASYNCWAIT:
239                     return false;
240                     
241                 case __REDISPATCH:
242                     _state=__REDISPATCHED;
243                     return true;
244 
245                 default:
246                     throw new IllegalStateException(this.getStatusString());
247             }
248         }
249     }
250 
251     /* ------------------------------------------------------------ */
252     /* (non-Javadoc)
253      * @see javax.servlet.ServletRequest#suspend(long)
254      */
255     protected void suspend(final ServletContext context,
256             final ServletRequest request,
257             final ServletResponse response)
258     {
259         synchronized (this)
260         {
261             switch(_state)
262             {
263                 case __DISPATCHED:
264                 case __REDISPATCHED:
265                     _resumed=false;
266                     _expired=false;
267 
268                     if (_event==null || request!=_event.getRequest() || response != _event.getResponse() || context != _event.getServletContext())
269                         _event=new AsyncEventState(context,request,response);
270                     else
271                     {
272                         _event._dispatchContext=null;
273                         _event._path=null;
274                     }
275 
276                     _state=__ASYNCSTARTED;
277                     break;
278 
279                 default:
280                     throw new IllegalStateException(this.getStatusString());
281             }
282         }
283         
284     }
285 
286     /* ------------------------------------------------------------ */
287     /**
288      * Signal that the HttpConnection has finished handling the request.
289      * For blocking connectors, this call may block if the request has
290      * been suspended (startAsync called).
291      * @return true if handling is complete, false if the request should 
292      * be handled again (eg because of a resume that happened before unhandle was called)
293      */
294     protected boolean unhandle()
295     {
296         synchronized (this)
297         {
298             List<ContinuationListener> listeners=_continuationListeners;
299             
300             switch(_state)
301             {
302                 case __REDISPATCHED:
303                 case __DISPATCHED:
304                     _state=__UNCOMPLETED;
305                     return true;
306 
307                 case __IDLE:
308                     throw new IllegalStateException(this.getStatusString());
309 
310                 case __ASYNCSTARTED:
311                     _initial=false;
312                     _state=__ASYNCWAIT;
313                     scheduleTimeout(); // could block and change state.
314                     if (_state==__ASYNCWAIT)
315                         return true;
316                     else if (_state==__COMPLETING)
317                     {
318                         _state=__UNCOMPLETED;
319                         return true;
320                     }
321                     _initial=false;
322                     _state=__REDISPATCHED;
323                     return false; 
324 
325                 case __REDISPATCHING:
326                     _initial=false;
327                     _state=__REDISPATCHED;
328                     return false; 
329 
330                 case __COMPLETING:
331                     _initial=false;
332                     _state=__UNCOMPLETED;
333                     return true;
334 
335                 default:
336                     throw new IllegalStateException(this.getStatusString());
337             }
338         }
339     }
340 
341     /* ------------------------------------------------------------ */
342     public void dispatch()
343     {
344         boolean dispatch=false;
345         synchronized (this)
346         {
347             switch(_state)
348             {
349                 case __ASYNCSTARTED:
350                     _state=__REDISPATCHING;
351                     _resumed=true;
352                     return;
353 
354                 case __ASYNCWAIT:
355                     dispatch=!_expired;
356                     _state=__REDISPATCH;
357                     _resumed=true;
358                     break;
359                     
360                 case __REDISPATCH:
361                     return;
362                     
363                 default:
364                     throw new IllegalStateException(this.getStatusString());
365             }
366         }
367         
368         if (dispatch)
369         {
370             cancelTimeout();
371             scheduleDispatch();
372         }
373     }
374 
375     /* ------------------------------------------------------------ */
376     protected void expired()
377     {
378         final List<ContinuationListener> listeners;
379         synchronized (this)
380         {
381             switch(_state)
382             {
383                 case __ASYNCSTARTED:
384                 case __ASYNCWAIT:
385                     listeners=_continuationListeners;
386                     break;
387                 default:
388                     listeners=null;
389                     return;
390             }
391             _expired=true;
392         }
393         
394         if (listeners!=null)
395         {
396             for (int i=0;i<listeners.size();i++)
397             {
398                 ContinuationListener listener=listeners.get(i);
399                 try
400                 {
401                     listener.onTimeout(this);
402                 }
403                 catch(Exception e)
404                 {
405                     Log.warn(e);
406                 }
407             }
408         }
409         
410         synchronized (this)
411         {
412             switch(_state)
413             {
414                 case __ASYNCSTARTED:
415                 case __ASYNCWAIT:
416                     dispatch();
417             }
418         }
419 
420         scheduleDispatch();
421     }
422     
423     /* ------------------------------------------------------------ */
424     /* (non-Javadoc)
425      * @see javax.servlet.ServletRequest#complete()
426      */
427     public void complete()
428     {
429         // just like resume, except don't set _resumed=true;
430         boolean dispatch=false;
431         synchronized (this)
432         {
433             switch(_state)
434             {
435                 case __DISPATCHED:
436                 case __REDISPATCHED:
437                     throw new IllegalStateException(this.getStatusString());
438 
439                 case __ASYNCSTARTED:
440                     _state=__COMPLETING;
441                     return;
442                     
443                 case __ASYNCWAIT:
444                     _state=__COMPLETING;
445                     dispatch=!_expired;
446                     break;
447                     
448                 default:
449                     throw new IllegalStateException(this.getStatusString());
450             }
451         }
452         
453         if (dispatch)
454         {
455             cancelTimeout();
456             scheduleDispatch();
457         }
458     }
459 
460     
461     /* ------------------------------------------------------------ */
462     /* (non-Javadoc)
463      * @see javax.servlet.ServletRequest#complete()
464      */
465     protected void doComplete()
466     {
467         final List<ContinuationListener> listeners;
468         synchronized (this)
469         {
470             switch(_state)
471             {
472                 case __UNCOMPLETED:
473                     _state=__COMPLETED;
474                     listeners=_continuationListeners;
475                     break;
476                     
477                 default:
478                     listeners=null;
479                     throw new IllegalStateException(this.getStatusString());
480             }
481         }
482         
483         if (listeners!=null)
484         {
485             for(int i=0;i<listeners.size();i++)
486             {
487                 try
488                 {
489                     listeners.get(i).onComplete(this);
490                 }
491                 catch(Exception e)
492                 {
493                     Log.warn(e);
494                 }
495             }
496         }
497     }
498 
499     /* ------------------------------------------------------------ */
500     protected void recycle()
501     {
502         synchronized (this)
503         {
504 //            _history.append("r\n");
505             switch(_state)
506             {
507                 case __DISPATCHED:
508                 case __REDISPATCHED:
509                     throw new IllegalStateException(getStatusString());
510                 default:
511                     _state=__IDLE;
512             }
513             _initial = true;
514             _resumed=false;
515             _expired=false;
516             _responseWrapped=false;
517             cancelTimeout();
518             _timeoutMs=DEFAULT_TIMEOUT;
519             _continuationListeners=null;
520         }
521     }    
522     
523     /* ------------------------------------------------------------ */
524     public void cancel()
525     {
526         synchronized (this)
527         {
528             cancelTimeout();
529             _continuationListeners=null;
530         }
531     }
532 
533     /* ------------------------------------------------------------ */
534     protected void scheduleDispatch()
535     {
536         EndPoint endp=_connection.getEndPoint();
537         if (!endp.isBlocking())
538         {
539             ((AsyncEndPoint)endp).dispatch();
540         }
541     }
542 
543     /* ------------------------------------------------------------ */
544     protected void scheduleTimeout()
545     {
546         EndPoint endp=_connection.getEndPoint();
547         if (_timeoutMs>0)
548         {
549             if (endp.isBlocking())
550             {
551                 synchronized(this)
552                 {
553                     _expireAt = System.currentTimeMillis()+_timeoutMs;
554                     long wait=_timeoutMs;
555                     while (_expireAt>0 && wait>0 && _connection.getServer().isRunning())
556                     {
557                         try
558                         {
559                             this.wait(wait);
560                         }
561                         catch (InterruptedException e)
562                         {
563                             Log.ignore(e);
564                         }
565                         wait=_expireAt-System.currentTimeMillis();
566                     }
567 
568                     if (_expireAt>0 && wait<=0 && _connection.getServer().isRunning())
569                     {
570                         expired();
571                     }
572                 }            
573             }
574             else
575             {
576                 _connection.scheduleTimeout(_event,_timeoutMs);
577             }
578         }
579     }
580 
581     /* ------------------------------------------------------------ */
582     protected void cancelTimeout()
583     {
584         EndPoint endp=_connection.getEndPoint();
585         if (endp.isBlocking())
586         {
587             synchronized(this)
588             {
589                 _expireAt=0;
590                 this.notifyAll();
591             }
592         }
593         else 
594         {
595             final AsyncEventState event=_event;
596             if (event!=null)
597                 _connection.cancelTimeout(event);
598         }
599     }
600 
601     /* ------------------------------------------------------------ */
602     public boolean isCompleting()
603     {
604         synchronized (this)
605         {
606             return _state==__COMPLETING;
607         }
608     }
609     
610     /* ------------------------------------------------------------ */
611     boolean isUncompleted()
612     {
613         synchronized (this)
614         {
615             return _state==__UNCOMPLETED;
616         }
617     } 
618     
619     /* ------------------------------------------------------------ */
620     public boolean isComplete()
621     {
622         synchronized (this)
623         {
624             return _state==__COMPLETED;
625         }
626     }
627 
628 
629     /* ------------------------------------------------------------ */
630     public boolean isAsyncStarted()
631     {
632         synchronized (this)
633         {
634             switch(_state)
635             {
636                 case __ASYNCSTARTED:
637                 case __REDISPATCHING:
638                 case __REDISPATCH:
639                 case __ASYNCWAIT:
640                     return true;
641 
642                 default:
643                     return false;
644             }
645         }
646     }
647 
648 
649     /* ------------------------------------------------------------ */
650     public boolean isAsync()
651     {
652         synchronized (this)
653         {
654             switch(_state)
655             {
656                 case __IDLE:
657                 case __DISPATCHED:
658                 case __UNCOMPLETED:
659                 case __COMPLETED:
660                     return false;
661 
662                 default:
663                     return true;
664             }
665         }
666     }
667 
668     /* ------------------------------------------------------------ */
669     public void dispatch(ServletContext context, String path)
670     {
671         _event._dispatchContext=context;
672         _event._path=path;
673         dispatch();
674     }
675 
676     /* ------------------------------------------------------------ */
677     public void dispatch(String path)
678     {
679         _event._path=path;
680         dispatch();
681     }
682 
683     /* ------------------------------------------------------------ */
684     public Request getBaseRequest()
685     {
686         return _connection.getRequest();
687     }
688     
689     /* ------------------------------------------------------------ */
690     public ServletRequest getRequest()
691     {
692         if (_event!=null)
693             return _event.getRequest();
694         return _connection.getRequest();
695     }
696 
697     /* ------------------------------------------------------------ */
698     public ServletResponse getResponse()
699     {
700         if (_event!=null)
701             return _event.getResponse();
702         return _connection.getResponse();
703     }
704 
705     /* ------------------------------------------------------------ */
706     public void start(final Runnable run)
707     {
708         final AsyncEventState event=_event;
709         if (event!=null)
710         {
711             _connection.getServer().getThreadPool().dispatch(new Runnable()
712             {
713                 public void run()
714                 {
715                     ((Context)event.getServletContext()).getContextHandler().handle(run);
716                 }
717             });
718         }
719     }
720 
721     /* ------------------------------------------------------------ */
722     public boolean hasOriginalRequestAndResponse()
723     {
724         synchronized (this)
725         {
726             return (_event!=null && _event.getRequest()==_connection._request && _event.getResponse()==_connection._response);
727         }
728     }
729 
730     /* ------------------------------------------------------------ */
731     public ContextHandler getContextHandler()
732     {
733         final AsyncEventState event=_event;
734         if (event!=null)
735             return ((Context)event.getServletContext()).getContextHandler();
736         return null;
737     }
738 
739 
740     /* ------------------------------------------------------------ */
741     /**
742      * @see Continuation#isResumed()
743      */
744     public boolean isResumed()
745     {
746         synchronized (this)
747         {
748             return _resumed;
749         }
750     }
751     /* ------------------------------------------------------------ */
752     /**
753      * @see Continuation#isExpired()
754      */
755     public boolean isExpired()
756     {
757         synchronized (this)
758         {
759             return _expired;
760         }
761     }
762 
763     /* ------------------------------------------------------------ */
764     /**
765      * @see Continuation#resume()
766      */
767     public void resume()
768     {
769         dispatch();
770     }
771     
772     /* ------------------------------------------------------------ */
773     /**
774      * @see Continuation#suspend()
775      */
776     public void suspend(ServletResponse response)
777     {
778         _responseWrapped=!(response instanceof Response);
779         AsyncContinuation.this.suspend(_connection.getRequest().getServletContext(),_connection.getRequest(),response); 
780     }
781 
782     /* ------------------------------------------------------------ */
783     /**
784      * @see Continuation#suspend()
785      */
786     public void suspend()
787     {
788         _responseWrapped=false;
789         AsyncContinuation.this.suspend(_connection.getRequest().getServletContext(),_connection.getRequest(),_connection.getResponse());       
790     }
791 
792     /* ------------------------------------------------------------ */
793     /**
794      * @see org.eclipse.jetty.continuation.Continuation#getServletResponse()
795      */
796     public ServletResponse getServletResponse()
797     {
798         if (_responseWrapped && _event!=null && _event.getResponse()!=null)
799             return _event.getResponse();
800         return _connection.getResponse();
801     }
802 
803     /* ------------------------------------------------------------ */
804     /**
805      * @see org.eclipse.jetty.continuation.Continuation#getAttribute(java.lang.String)
806      */
807     public Object getAttribute(String name)
808     {
809         return _connection.getRequest().getAttribute(name);
810     }
811 
812     /* ------------------------------------------------------------ */
813     /**
814      * @see org.eclipse.jetty.continuation.Continuation#removeAttribute(java.lang.String)
815      */
816     public void removeAttribute(String name)
817     {
818         _connection.getRequest().removeAttribute(name);
819     }
820 
821     /* ------------------------------------------------------------ */
822     /**
823      * @see org.eclipse.jetty.continuation.Continuation#setAttribute(java.lang.String, java.lang.Object)
824      */
825     public void setAttribute(String name, Object attribute)
826     {
827         _connection.getRequest().setAttribute(name,attribute);
828     }
829 
830     /* ------------------------------------------------------------ */
831     /**
832      * @see org.eclipse.jetty.continuation.Continuation#undispatch()
833      */
834     public void undispatch()
835     {
836         if (isSuspended())
837         {
838             if (Log.isDebugEnabled())
839                 throw new ContinuationThrowable();
840             else
841                 throw __exception;
842         }
843         throw new IllegalStateException("!suspended");
844     }
845 
846     /* ------------------------------------------------------------ */
847     /* ------------------------------------------------------------ */
848     public class AsyncEventState extends Timeout.Task implements Runnable
849     {
850         private final ServletContext _suspendedContext;
851         private final ServletRequest _request;
852         private final ServletResponse _response;
853         private ServletContext _dispatchContext;
854         private String _path;
855         
856         public AsyncEventState(ServletContext context, ServletRequest request, ServletResponse response)
857         {
858             _suspendedContext=context;
859             _request=request;
860             _response=response;
861         }
862         
863         public ServletContext getSuspendedContext()
864         {
865             return _suspendedContext;
866         }
867         
868         public ServletContext getDispatchContext()
869         {
870             return _dispatchContext;
871         }
872         
873         public ServletContext getServletContext()
874         {
875             return _dispatchContext==null?_suspendedContext:_dispatchContext;
876         }
877 
878         public ServletRequest getRequest()
879         {
880             return _request;
881         }
882 
883         public ServletResponse getResponse()
884         {
885             return _response;
886         }
887         
888         public String getPath()
889         {
890             return _path;
891         }
892 
893         @Override
894         public void expired()
895         {
896             AsyncContinuation.this.expired();
897         }
898         
899         public void run()
900         {
901             AsyncContinuation.this.expired();  
902         }
903     }
904 }