View Javadoc

1   // ========================================================================
2   // Copyright (c) 2007-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.server;
15  
16  import java.util.ArrayList;
17  import java.util.List;
18  
19  import javax.servlet.ServletContext;
20  import javax.servlet.ServletRequest;
21  import javax.servlet.ServletResponse;
22  
23  import org.eclipse.jetty.continuation.Continuation;
24  import org.eclipse.jetty.continuation.ContinuationListener;
25  import org.eclipse.jetty.continuation.ContinuationThrowable;
26  import org.eclipse.jetty.io.AsyncEndPoint;
27  import org.eclipse.jetty.io.EndPoint;
28  import org.eclipse.jetty.server.handler.ContextHandler;
29  import org.eclipse.jetty.server.handler.ContextHandler.Context;
30  import org.eclipse.jetty.util.log.Log;
31  import org.eclipse.jetty.util.log.Logger;
32  import org.eclipse.jetty.util.thread.Timeout;
33  
34  /* ------------------------------------------------------------ */
35  /** Implementation of Continuation and AsyncContext interfaces.
36   * 
37   */
38  public class AsyncContinuation implements AsyncContext, Continuation
39  {
40      private static final Logger LOG = Log.getLogger(AsyncContinuation.class);
41  
42      private final static long DEFAULT_TIMEOUT=30000L;
43      
44      private final static ContinuationThrowable __exception = new ContinuationThrowable();
45      
46      // STATES:
47      //               handling()    suspend()     unhandle()    resume()       complete()  doComplete()
48      //                             startAsync()                dispatch()   
49      // IDLE          DISPATCHED      
50      // DISPATCHED                  ASYNCSTARTED  UNCOMPLETED
51      // ASYNCSTARTED                              ASYNCWAIT     REDISPATCHING  COMPLETING
52      // REDISPATCHING                             REDISPATCHED  
53      // ASYNCWAIT                                               REDISPATCH     COMPLETING
54      // REDISPATCH    REDISPATCHED
55      // REDISPATCHED                ASYNCSTARTED  UNCOMPLETED
56      // COMPLETING    UNCOMPLETED                 UNCOMPLETED
57      // UNCOMPLETED                                                                        COMPLETED
58      // COMPLETED
59      private static final int __IDLE=0;         // Idle request
60      private static final int __DISPATCHED=1;   // Request dispatched to filter/servlet
61      private static final int __ASYNCSTARTED=2; // Suspend called, but not yet returned to container
62      private static final int __REDISPATCHING=3;// resumed while dispatched
63      private static final int __ASYNCWAIT=4;    // Suspended and parked
64      private static final int __REDISPATCH=5;   // Has been scheduled
65      private static final int __REDISPATCHED=6; // Request redispatched to filter/servlet
66      private static final int __COMPLETING=7;   // complete while dispatched
67      private static final int __UNCOMPLETED=8;  // Request is completable
68      private static final int __COMPLETED=9;    // Request is complete
69      
70  
71      /* ------------------------------------------------------------ */
72      protected AbstractHttpConnection _connection;
73      private List<ContinuationListener> _continuationListeners;
74  
75      /* ------------------------------------------------------------ */
76      private int _state;
77      private boolean _initial;
78      private boolean _resumed;
79      private boolean _expired;
80      private volatile boolean _responseWrapped;
81      private long _timeoutMs=DEFAULT_TIMEOUT;
82      private AsyncEventState _event;
83      private volatile long _expireAt;
84      
85      /* ------------------------------------------------------------ */
86      protected AsyncContinuation()
87      {
88          _state=__IDLE;
89          _initial=true;
90      }
91  
92      /* ------------------------------------------------------------ */
93      protected void setConnection(final AbstractHttpConnection connection)
94      {
95          synchronized(this)
96          {
97              _connection=connection;
98          }
99      }
100 
101     /* ------------------------------------------------------------ */
102     public void addContinuationListener(ContinuationListener listener)
103     {
104         synchronized(this)
105         {
106             if (_continuationListeners==null)
107                 _continuationListeners=new ArrayList<ContinuationListener>();
108             _continuationListeners.add(listener);
109         }
110     }
111 
112     /* ------------------------------------------------------------ */
113     public void setTimeout(long ms)
114     {
115         synchronized(this)
116         {
117             _timeoutMs=ms;
118         }
119     } 
120 
121     /* ------------------------------------------------------------ */
122     public long getTimeout()
123     {
124         synchronized(this)
125         {
126             return _timeoutMs;
127         }
128     } 
129 
130     /* ------------------------------------------------------------ */
131     public AsyncEventState getAsyncEventState()
132     {
133         synchronized(this)
134         {
135             return _event;
136         }
137     } 
138    
139     /* ------------------------------------------------------------ */
140     /**
141      * @see org.eclipse.jetty.continuation.Continuation#keepWrappers()
142      */
143 
144     /* ------------------------------------------------------------ */
145     /**
146      * @see org.eclipse.jetty.continuation.Continuation#isResponseWrapped()
147      */
148     public boolean isResponseWrapped()
149     {
150         return _responseWrapped;
151     }
152 
153     /* ------------------------------------------------------------ */
154     /* (non-Javadoc)
155      * @see javax.servlet.ServletRequest#isInitial()
156      */
157     public boolean isInitial()
158     {
159         synchronized(this)
160         {
161             return _initial;
162         }
163     }
164     
165     /* ------------------------------------------------------------ */
166     /* (non-Javadoc)
167      * @see javax.servlet.ServletRequest#isSuspended()
168      */
169     public boolean isSuspended()
170     {
171         synchronized(this)
172         {
173             switch(_state)
174             {
175                 case __ASYNCSTARTED:
176                 case __REDISPATCHING:
177                 case __COMPLETING:
178                 case __ASYNCWAIT:
179                     return true;
180                     
181                 default:
182                     return false;   
183             }
184         }
185     }
186     
187     /* ------------------------------------------------------------ */
188     public boolean isSuspending()
189     {
190         synchronized(this)
191         {
192             switch(_state)
193             {
194                 case __ASYNCSTARTED:
195                 case __ASYNCWAIT:
196                     return true;
197                     
198                 default:
199                     return false;   
200             }
201         }
202     }
203     
204     /* ------------------------------------------------------------ */
205     public boolean isDispatchable()
206     {
207         synchronized(this)
208         {
209             switch(_state)
210             {
211                 case __REDISPATCH:
212                 case __REDISPATCHED:
213                 case __REDISPATCHING:
214                 case __COMPLETING:
215                     return true;
216                     
217                 default:
218                     return false;   
219             }
220         }
221     }
222 
223     /* ------------------------------------------------------------ */
224     @Override
225     public String toString()
226     {
227         synchronized (this)
228         {
229             return super.toString()+"@"+getStatusString();
230         }
231     }
232 
233     /* ------------------------------------------------------------ */
234     public String getStatusString()
235     {
236         synchronized (this)
237         {
238             return
239             ((_state==__IDLE)?"IDLE":
240                 (_state==__DISPATCHED)?"DISPATCHED":
241                     (_state==__ASYNCSTARTED)?"ASYNCSTARTED":
242                         (_state==__ASYNCWAIT)?"ASYNCWAIT":
243                             (_state==__REDISPATCHING)?"REDISPATCHING":
244                                 (_state==__REDISPATCH)?"REDISPATCH":
245                                     (_state==__REDISPATCHED)?"REDISPATCHED":
246                                         (_state==__COMPLETING)?"COMPLETING":
247                                             (_state==__UNCOMPLETED)?"UNCOMPLETED":
248                                                 (_state==__COMPLETED)?"COMPLETE":
249                                                     ("UNKNOWN?"+_state))+
250             (_initial?",initial":"")+
251             (_resumed?",resumed":"")+
252             (_expired?",expired":"");
253         }
254     }
255 
256     /* ------------------------------------------------------------ */
257     /**
258      * @return false if the handling of the request should not proceed
259      */
260     protected boolean handling()
261     {
262         synchronized (this)
263         {
264             _responseWrapped=false;
265             
266             switch(_state)
267             {
268                 case __IDLE:
269                     _initial=true;
270                     _state=__DISPATCHED;
271                     return true;
272                     
273                 case __COMPLETING:
274                     _state=__UNCOMPLETED;
275                     return false;
276 
277                 case __ASYNCWAIT:
278                     return false;
279                     
280                 case __REDISPATCH:
281                     _state=__REDISPATCHED;
282                     return true;
283 
284                 default:
285                     throw new IllegalStateException(this.getStatusString());
286             }
287         }
288     }
289 
290     /* ------------------------------------------------------------ */
291     /* (non-Javadoc)
292      * @see javax.servlet.ServletRequest#suspend(long)
293      */
294     protected void suspend(final ServletContext context,
295             final ServletRequest request,
296             final ServletResponse response)
297     {
298         synchronized (this)
299         {
300             switch(_state)
301             {
302                 case __DISPATCHED:
303                 case __REDISPATCHED:
304                     _resumed=false;
305                     _expired=false;
306 
307                     if (_event==null || request!=_event.getRequest() || response != _event.getResponse() || context != _event.getServletContext())
308                         _event=new AsyncEventState(context,request,response);
309                     else
310                     {
311                         _event._dispatchContext=null;
312                         _event._path=null;
313                     }
314 
315                     _state=__ASYNCSTARTED;
316                     break;
317 
318                 default:
319                     throw new IllegalStateException(this.getStatusString());
320             }
321         }
322         
323     }
324 
325     /* ------------------------------------------------------------ */
326     /**
327      * Signal that the HttpConnection has finished handling the request.
328      * For blocking connectors, this call may block if the request has
329      * been suspended (startAsync called).
330      * @return true if handling is complete, false if the request should 
331      * be handled again (eg because of a resume that happened before unhandle was called)
332      */
333     protected boolean unhandle()
334     {
335         synchronized (this)
336         {
337             List<ContinuationListener> listeners=_continuationListeners;
338             
339             switch(_state)
340             {
341                 case __REDISPATCHED:
342                 case __DISPATCHED:
343                     _state=__UNCOMPLETED;
344                     return true;
345 
346                 case __IDLE:
347                     throw new IllegalStateException(this.getStatusString());
348 
349                 case __ASYNCSTARTED:
350                     _initial=false;
351                     _state=__ASYNCWAIT;
352                     scheduleTimeout(); // could block and change state.
353                     if (_state==__ASYNCWAIT)
354                         return true;
355                     else if (_state==__COMPLETING)
356                     {
357                         _state=__UNCOMPLETED;
358                         return true;
359                     }
360                     _initial=false;
361                     _state=__REDISPATCHED;
362                     return false; 
363 
364                 case __REDISPATCHING:
365                     _initial=false;
366                     _state=__REDISPATCHED;
367                     return false; 
368 
369                 case __COMPLETING:
370                     _initial=false;
371                     _state=__UNCOMPLETED;
372                     return true;
373 
374                 default:
375                     throw new IllegalStateException(this.getStatusString());
376             }
377         }
378     }
379 
380     /* ------------------------------------------------------------ */
381     public void dispatch()
382     {
383         boolean dispatch=false;
384         synchronized (this)
385         {
386             switch(_state)
387             {
388                 case __ASYNCSTARTED:
389                     _state=__REDISPATCHING;
390                     _resumed=true;
391                     return;
392 
393                 case __ASYNCWAIT:
394                     dispatch=!_expired;
395                     _state=__REDISPATCH;
396                     _resumed=true;
397                     break;
398                     
399                 case __REDISPATCH:
400                     return;
401                     
402                 default:
403                     throw new IllegalStateException(this.getStatusString());
404             }
405         }
406         
407         if (dispatch)
408         {
409             cancelTimeout();
410             scheduleDispatch();
411         }
412     }
413 
414     /* ------------------------------------------------------------ */
415     protected void expired()
416     {
417         final List<ContinuationListener> listeners;
418         synchronized (this)
419         {
420             switch(_state)
421             {
422                 case __ASYNCSTARTED:
423                 case __ASYNCWAIT:
424                     listeners=_continuationListeners;
425                     break;
426                 default:
427                     listeners=null;
428                     return;
429             }
430             _expired=true;
431         }
432         
433         if (listeners!=null)
434         {
435             for (int i=0;i<listeners.size();i++)
436             {
437                 ContinuationListener listener=listeners.get(i);
438                 try
439                 {
440                     listener.onTimeout(this);
441                 }
442                 catch(Exception e)
443                 {
444                     LOG.warn(e);
445                 }
446             }
447         }
448         
449         synchronized (this)
450         {
451             switch(_state)
452             {
453                 case __ASYNCSTARTED:
454                 case __ASYNCWAIT:
455                     dispatch();
456             }
457         }
458 
459         scheduleDispatch();
460     }
461     
462     /* ------------------------------------------------------------ */
463     /* (non-Javadoc)
464      * @see javax.servlet.ServletRequest#complete()
465      */
466     public void complete()
467     {
468         // just like resume, except don't set _resumed=true;
469         boolean dispatch=false;
470         synchronized (this)
471         {
472             switch(_state)
473             {
474                 case __DISPATCHED:
475                 case __REDISPATCHED:
476                     throw new IllegalStateException(this.getStatusString());
477 
478                 case __ASYNCSTARTED:
479                     _state=__COMPLETING;
480                     return;
481                     
482                 case __ASYNCWAIT:
483                     _state=__COMPLETING;
484                     dispatch=!_expired;
485                     break;
486                     
487                 default:
488                     throw new IllegalStateException(this.getStatusString());
489             }
490         }
491         
492         if (dispatch)
493         {
494             cancelTimeout();
495             scheduleDispatch();
496         }
497     }
498 
499     
500     /* ------------------------------------------------------------ */
501     /* (non-Javadoc)
502      * @see javax.servlet.ServletRequest#complete()
503      */
504     protected void doComplete()
505     {
506         final List<ContinuationListener> listeners;
507         synchronized (this)
508         {
509             switch(_state)
510             {
511                 case __UNCOMPLETED:
512                     _state=__COMPLETED;
513                     listeners=_continuationListeners;
514                     break;
515                     
516                 default:
517                     listeners=null;
518                     throw new IllegalStateException(this.getStatusString());
519             }
520         }
521         
522         if (listeners!=null)
523         {
524             for(int i=0;i<listeners.size();i++)
525             {
526                 try
527                 {
528                     listeners.get(i).onComplete(this);
529                 }
530                 catch(Exception e)
531                 {
532                     LOG.warn(e);
533                 }
534             }
535         }
536     }
537 
538     /* ------------------------------------------------------------ */
539     protected void recycle()
540     {
541         synchronized (this)
542         {
543 //            _history.append("r\n");
544             switch(_state)
545             {
546                 case __DISPATCHED:
547                 case __REDISPATCHED:
548                     throw new IllegalStateException(getStatusString());
549                 default:
550                     _state=__IDLE;
551             }
552             _initial = true;
553             _resumed=false;
554             _expired=false;
555             _responseWrapped=false;
556             cancelTimeout();
557             _timeoutMs=DEFAULT_TIMEOUT;
558             _continuationListeners=null;
559         }
560     }    
561     
562     /* ------------------------------------------------------------ */
563     public void cancel()
564     {
565         synchronized (this)
566         {
567             cancelTimeout();
568             _continuationListeners=null;
569         }
570     }
571 
572     /* ------------------------------------------------------------ */
573     protected void scheduleDispatch()
574     {
575         EndPoint endp=_connection.getEndPoint();
576         if (!endp.isBlocking())
577         {
578             ((AsyncEndPoint)endp).asyncDispatch();
579         }
580     }
581 
582     /* ------------------------------------------------------------ */
583     protected void scheduleTimeout()
584     {
585         EndPoint endp=_connection.getEndPoint();
586         if (_timeoutMs>0)
587         {
588             if (endp.isBlocking())
589             {
590                 synchronized(this)
591                 {
592                     _expireAt = System.currentTimeMillis()+_timeoutMs;
593                     long wait=_timeoutMs;
594                     while (_expireAt>0 && wait>0 && _connection.getServer().isRunning())
595                     {
596                         try
597                         {
598                             this.wait(wait);
599                         }
600                         catch (InterruptedException e)
601                         {
602                             LOG.ignore(e);
603                         }
604                         wait=_expireAt-System.currentTimeMillis();
605                     }
606 
607                     if (_expireAt>0 && wait<=0 && _connection.getServer().isRunning())
608                     {
609                         expired();
610                     }
611                 }            
612             }
613             else
614             {
615                 ((AsyncEndPoint)endp).scheduleTimeout(_event,_timeoutMs);
616             }
617         }
618     }
619 
620     /* ------------------------------------------------------------ */
621     protected void cancelTimeout()
622     {
623         EndPoint endp=_connection.getEndPoint();
624         if (endp.isBlocking())
625         {
626             synchronized(this)
627             {
628                 _expireAt=0;
629                 this.notifyAll();
630             }
631         }
632         else 
633         {
634             final AsyncEventState event=_event;
635             if (event!=null)
636             {
637                 ((AsyncEndPoint)endp).cancelTimeout(event);
638             }
639         }
640     }
641 
642     /* ------------------------------------------------------------ */
643     public boolean isCompleting()
644     {
645         synchronized (this)
646         {
647             return _state==__COMPLETING;
648         }
649     }
650     
651     /* ------------------------------------------------------------ */
652     boolean isUncompleted()
653     {
654         synchronized (this)
655         {
656             return _state==__UNCOMPLETED;
657         }
658     } 
659     
660     /* ------------------------------------------------------------ */
661     public boolean isComplete()
662     {
663         synchronized (this)
664         {
665             return _state==__COMPLETED;
666         }
667     }
668 
669 
670     /* ------------------------------------------------------------ */
671     public boolean isAsyncStarted()
672     {
673         synchronized (this)
674         {
675             switch(_state)
676             {
677                 case __ASYNCSTARTED:
678                 case __REDISPATCHING:
679                 case __REDISPATCH:
680                 case __ASYNCWAIT:
681                     return true;
682 
683                 default:
684                     return false;
685             }
686         }
687     }
688 
689 
690     /* ------------------------------------------------------------ */
691     public boolean isAsync()
692     {
693         synchronized (this)
694         {
695             switch(_state)
696             {
697                 case __IDLE:
698                 case __DISPATCHED:
699                 case __UNCOMPLETED:
700                 case __COMPLETED:
701                     return false;
702 
703                 default:
704                     return true;
705             }
706         }
707     }
708 
709     /* ------------------------------------------------------------ */
710     public void dispatch(ServletContext context, String path)
711     {
712         _event._dispatchContext=context;
713         _event._path=path;
714         dispatch();
715     }
716 
717     /* ------------------------------------------------------------ */
718     public void dispatch(String path)
719     {
720         _event._path=path;
721         dispatch();
722     }
723 
724     /* ------------------------------------------------------------ */
725     public Request getBaseRequest()
726     {
727         return _connection.getRequest();
728     }
729     
730     /* ------------------------------------------------------------ */
731     public ServletRequest getRequest()
732     {
733         if (_event!=null)
734             return _event.getRequest();
735         return _connection.getRequest();
736     }
737 
738     /* ------------------------------------------------------------ */
739     public ServletResponse getResponse()
740     {
741         if (_event!=null)
742             return _event.getResponse();
743         return _connection.getResponse();
744     }
745 
746     /* ------------------------------------------------------------ */
747     public void start(final Runnable run)
748     {
749         final AsyncEventState event=_event;
750         if (event!=null)
751         {
752             _connection.getServer().getThreadPool().dispatch(new Runnable()
753             {
754                 public void run()
755                 {
756                     ((Context)event.getServletContext()).getContextHandler().handle(run);
757                 }
758             });
759         }
760     }
761 
762     /* ------------------------------------------------------------ */
763     public boolean hasOriginalRequestAndResponse()
764     {
765         synchronized (this)
766         {
767             return (_event!=null && _event.getRequest()==_connection._request && _event.getResponse()==_connection._response);
768         }
769     }
770 
771     /* ------------------------------------------------------------ */
772     public ContextHandler getContextHandler()
773     {
774         final AsyncEventState event=_event;
775         if (event!=null)
776             return ((Context)event.getServletContext()).getContextHandler();
777         return null;
778     }
779 
780 
781     /* ------------------------------------------------------------ */
782     /**
783      * @see Continuation#isResumed()
784      */
785     public boolean isResumed()
786     {
787         synchronized (this)
788         {
789             return _resumed;
790         }
791     }
792     /* ------------------------------------------------------------ */
793     /**
794      * @see Continuation#isExpired()
795      */
796     public boolean isExpired()
797     {
798         synchronized (this)
799         {
800             return _expired;
801         }
802     }
803 
804     /* ------------------------------------------------------------ */
805     /**
806      * @see Continuation#resume()
807      */
808     public void resume()
809     {
810         dispatch();
811     }
812     
813     /* ------------------------------------------------------------ */
814     /**
815      * @see Continuation#suspend()
816      */
817     public void suspend(ServletResponse response)
818     {
819         _responseWrapped=!(response instanceof Response);
820         AsyncContinuation.this.suspend(_connection.getRequest().getServletContext(),_connection.getRequest(),response); 
821     }
822 
823     /* ------------------------------------------------------------ */
824     /**
825      * @see Continuation#suspend()
826      */
827     public void suspend()
828     {
829         _responseWrapped=false;
830         AsyncContinuation.this.suspend(_connection.getRequest().getServletContext(),_connection.getRequest(),_connection.getResponse());       
831     }
832 
833     /* ------------------------------------------------------------ */
834     /**
835      * @see org.eclipse.jetty.continuation.Continuation#getServletResponse()
836      */
837     public ServletResponse getServletResponse()
838     {
839         if (_responseWrapped && _event!=null && _event.getResponse()!=null)
840             return _event.getResponse();
841         return _connection.getResponse();
842     }
843 
844     /* ------------------------------------------------------------ */
845     /**
846      * @see org.eclipse.jetty.continuation.Continuation#getAttribute(java.lang.String)
847      */
848     public Object getAttribute(String name)
849     {
850         return _connection.getRequest().getAttribute(name);
851     }
852 
853     /* ------------------------------------------------------------ */
854     /**
855      * @see org.eclipse.jetty.continuation.Continuation#removeAttribute(java.lang.String)
856      */
857     public void removeAttribute(String name)
858     {
859         _connection.getRequest().removeAttribute(name);
860     }
861 
862     /* ------------------------------------------------------------ */
863     /**
864      * @see org.eclipse.jetty.continuation.Continuation#setAttribute(java.lang.String, java.lang.Object)
865      */
866     public void setAttribute(String name, Object attribute)
867     {
868         _connection.getRequest().setAttribute(name,attribute);
869     }
870 
871     /* ------------------------------------------------------------ */
872     /**
873      * @see org.eclipse.jetty.continuation.Continuation#undispatch()
874      */
875     public void undispatch()
876     {
877         if (isSuspended())
878         {
879             if (LOG.isDebugEnabled())
880                 throw new ContinuationThrowable();
881             else
882                 throw __exception;
883         }
884         throw new IllegalStateException("!suspended");
885     }
886 
887     /* ------------------------------------------------------------ */
888     /* ------------------------------------------------------------ */
889     public class AsyncEventState extends Timeout.Task implements Runnable
890     {
891         private final ServletContext _suspendedContext;
892         private final ServletRequest _request;
893         private final ServletResponse _response;
894         private ServletContext _dispatchContext;
895         private String _path;
896         
897         public AsyncEventState(ServletContext context, ServletRequest request, ServletResponse response)
898         {
899             _suspendedContext=context;
900             _request=request;
901             _response=response;
902         }
903         
904         public ServletContext getSuspendedContext()
905         {
906             return _suspendedContext;
907         }
908         
909         public ServletContext getDispatchContext()
910         {
911             return _dispatchContext;
912         }
913         
914         public ServletContext getServletContext()
915         {
916             return _dispatchContext==null?_suspendedContext:_dispatchContext;
917         }
918 
919         public ServletRequest getRequest()
920         {
921             return _request;
922         }
923 
924         public ServletResponse getResponse()
925         {
926             return _response;
927         }
928         
929         public String getPath()
930         {
931             return _path;
932         }
933 
934         @Override
935         public void expired()
936         {
937             AsyncContinuation.this.expired();
938         }
939         
940         public void run()
941         {
942             AsyncContinuation.this.expired();  
943         }
944     }
945 }