View Javadoc

1   // ========================================================================
2   // Copyright (c) 2007-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.server;
15  
16  import java.util.ArrayList;
17  import java.util.List;
18  
19  import javax.servlet.ServletContext;
20  import javax.servlet.ServletRequest;
21  import javax.servlet.ServletResponse;
22  
23  import org.eclipse.jetty.continuation.Continuation;
24  import org.eclipse.jetty.continuation.ContinuationListener;
25  import org.eclipse.jetty.continuation.ContinuationThrowable;
26  import org.eclipse.jetty.io.AsyncEndPoint;
27  import org.eclipse.jetty.io.EndPoint;
28  import org.eclipse.jetty.server.handler.ContextHandler;
29  import org.eclipse.jetty.server.handler.ContextHandler.Context;
30  import org.eclipse.jetty.util.log.Log;
31  import org.eclipse.jetty.util.log.Logger;
32  import org.eclipse.jetty.util.thread.Timeout;
33  
34  /* ------------------------------------------------------------ */
35  /** Implementation of Continuation and AsyncContext interfaces.
36   * 
37   */
38  public class AsyncContinuation implements AsyncContext, Continuation
39  {
40      private static final Logger LOG = Log.getLogger(AsyncContinuation.class);
41  
42      private final static long DEFAULT_TIMEOUT=30000L;
43      
44      private final static ContinuationThrowable __exception = new ContinuationThrowable();
45      
46      // STATES:
47      //               handling()    suspend()     unhandle()    resume()       complete()  doComplete()
48      //                             startAsync()                dispatch()   
49      // IDLE          DISPATCHED      
50      // DISPATCHED                  ASYNCSTARTED  UNCOMPLETED
51      // ASYNCSTARTED                              ASYNCWAIT     REDISPATCHING  COMPLETING
52      // REDISPATCHING                             REDISPATCHED  
53      // ASYNCWAIT                                               REDISPATCH     COMPLETING
54      // REDISPATCH    REDISPATCHED
55      // REDISPATCHED                ASYNCSTARTED  UNCOMPLETED
56      // COMPLETING    UNCOMPLETED                 UNCOMPLETED
57      // UNCOMPLETED                                                                        COMPLETED
58      // COMPLETED
59      private static final int __IDLE=0;         // Idle request
60      private static final int __DISPATCHED=1;   // Request dispatched to filter/servlet
61      private static final int __ASYNCSTARTED=2; // Suspend called, but not yet returned to container
62      private static final int __REDISPATCHING=3;// resumed while dispatched
63      private static final int __ASYNCWAIT=4;    // Suspended and parked
64      private static final int __REDISPATCH=5;   // Has been scheduled
65      private static final int __REDISPATCHED=6; // Request redispatched to filter/servlet
66      private static final int __COMPLETING=7;   // complete while dispatched
67      private static final int __UNCOMPLETED=8;  // Request is completable
68      private static final int __COMPLETED=9;    // Request is complete
69      
70  
71      /* ------------------------------------------------------------ */
72      protected HttpConnection _connection;
73      private List<ContinuationListener> _continuationListeners;
74  
75      /* ------------------------------------------------------------ */
76      private int _state;
77      private boolean _initial;
78      private boolean _resumed;
79      private boolean _expired;
80      private volatile boolean _responseWrapped;
81      private long _timeoutMs=DEFAULT_TIMEOUT;
82      private AsyncEventState _event;
83      private volatile long _expireAt;
84      
85      /* ------------------------------------------------------------ */
86      protected AsyncContinuation()
87      {
88          _state=__IDLE;
89          _initial=true;
90      }
91  
92      /* ------------------------------------------------------------ */
93      protected void setConnection(final HttpConnection connection)
94      {
95          synchronized(this)
96          {
97              _connection=connection;
98          }
99      }
100 
101     /* ------------------------------------------------------------ */
102     public void addContinuationListener(ContinuationListener listener)
103     {
104         synchronized(this)
105         {
106             if (_continuationListeners==null)
107                 _continuationListeners=new ArrayList<ContinuationListener>();
108             _continuationListeners.add(listener);
109         }
110     }
111 
112     /* ------------------------------------------------------------ */
113     public void setTimeout(long ms)
114     {
115         synchronized(this)
116         {
117             _timeoutMs=ms;
118         }
119     } 
120 
121     /* ------------------------------------------------------------ */
122     public long getTimeout()
123     {
124         synchronized(this)
125         {
126             return _timeoutMs;
127         }
128     } 
129 
130     /* ------------------------------------------------------------ */
131     public AsyncEventState getAsyncEventState()
132     {
133         synchronized(this)
134         {
135             return _event;
136         }
137     } 
138    
139     /* ------------------------------------------------------------ */
140     /**
141      * @see org.eclipse.jetty.continuation.Continuation#keepWrappers()
142      */
143 
144     /* ------------------------------------------------------------ */
145     /**
146      * @see org.eclipse.jetty.continuation.Continuation#isResponseWrapped()
147      */
148     public boolean isResponseWrapped()
149     {
150         return _responseWrapped;
151     }
152 
153     /* ------------------------------------------------------------ */
154     /* (non-Javadoc)
155      * @see javax.servlet.ServletRequest#isInitial()
156      */
157     public boolean isInitial()
158     {
159         synchronized(this)
160         {
161             return _initial;
162         }
163     }
164     
165     /* ------------------------------------------------------------ */
166     /* (non-Javadoc)
167      * @see javax.servlet.ServletRequest#isSuspended()
168      */
169     public boolean isSuspended()
170     {
171         synchronized(this)
172         {
173             switch(_state)
174             {
175                 case __ASYNCSTARTED:
176                 case __REDISPATCHING:
177                 case __COMPLETING:
178                 case __ASYNCWAIT:
179                     return true;
180                     
181                 default:
182                     return false;   
183             }
184         }
185     }
186 
187     /* ------------------------------------------------------------ */
188     @Override
189     public String toString()
190     {
191         synchronized (this)
192         {
193             return super.toString()+"@"+getStatusString();
194         }
195     }
196 
197     /* ------------------------------------------------------------ */
198     public String getStatusString()
199     {
200         synchronized (this)
201         {
202             return
203             ((_state==__IDLE)?"IDLE":
204                 (_state==__DISPATCHED)?"DISPATCHED":
205                     (_state==__ASYNCSTARTED)?"ASYNCSTARTED":
206                         (_state==__ASYNCWAIT)?"ASYNCWAIT":
207                             (_state==__REDISPATCHING)?"REDISPATCHING":
208                                 (_state==__REDISPATCH)?"REDISPATCH":
209                                     (_state==__REDISPATCHED)?"REDISPATCHED":
210                                         (_state==__COMPLETING)?"COMPLETING":
211                                             (_state==__UNCOMPLETED)?"UNCOMPLETED":
212                                                 (_state==__COMPLETED)?"COMPLETE":
213                                                     ("UNKNOWN?"+_state))+
214             (_initial?",initial":"")+
215             (_resumed?",resumed":"")+
216             (_expired?",expired":"");
217         }
218     }
219 
220     /* ------------------------------------------------------------ */
221     /**
222      * @return false if the handling of the request should not proceed
223      */
224     protected boolean handling()
225     {
226         synchronized (this)
227         {
228             _responseWrapped=false;
229             
230             switch(_state)
231             {
232                 case __IDLE:
233                     _initial=true;
234                     _state=__DISPATCHED;
235                     return true;
236                     
237                 case __COMPLETING:
238                     _state=__UNCOMPLETED;
239                     return false;
240 
241                 case __ASYNCWAIT:
242                     return false;
243                     
244                 case __REDISPATCH:
245                     _state=__REDISPATCHED;
246                     return true;
247 
248                 default:
249                     throw new IllegalStateException(this.getStatusString());
250             }
251         }
252     }
253 
254     /* ------------------------------------------------------------ */
255     /* (non-Javadoc)
256      * @see javax.servlet.ServletRequest#suspend(long)
257      */
258     protected void suspend(final ServletContext context,
259             final ServletRequest request,
260             final ServletResponse response)
261     {
262         synchronized (this)
263         {
264             switch(_state)
265             {
266                 case __DISPATCHED:
267                 case __REDISPATCHED:
268                     _resumed=false;
269                     _expired=false;
270 
271                     if (_event==null || request!=_event.getRequest() || response != _event.getResponse() || context != _event.getServletContext())
272                         _event=new AsyncEventState(context,request,response);
273                     else
274                     {
275                         _event._dispatchContext=null;
276                         _event._path=null;
277                     }
278 
279                     _state=__ASYNCSTARTED;
280                     break;
281 
282                 default:
283                     throw new IllegalStateException(this.getStatusString());
284             }
285         }
286         
287     }
288 
289     /* ------------------------------------------------------------ */
290     /**
291      * Signal that the HttpConnection has finished handling the request.
292      * For blocking connectors, this call may block if the request has
293      * been suspended (startAsync called).
294      * @return true if handling is complete, false if the request should 
295      * be handled again (eg because of a resume that happened before unhandle was called)
296      */
297     protected boolean unhandle()
298     {
299         synchronized (this)
300         {
301             List<ContinuationListener> listeners=_continuationListeners;
302             
303             switch(_state)
304             {
305                 case __REDISPATCHED:
306                 case __DISPATCHED:
307                     _state=__UNCOMPLETED;
308                     return true;
309 
310                 case __IDLE:
311                     throw new IllegalStateException(this.getStatusString());
312 
313                 case __ASYNCSTARTED:
314                     _initial=false;
315                     _state=__ASYNCWAIT;
316                     scheduleTimeout(); // could block and change state.
317                     if (_state==__ASYNCWAIT)
318                         return true;
319                     else if (_state==__COMPLETING)
320                     {
321                         _state=__UNCOMPLETED;
322                         return true;
323                     }
324                     _initial=false;
325                     _state=__REDISPATCHED;
326                     return false; 
327 
328                 case __REDISPATCHING:
329                     _initial=false;
330                     _state=__REDISPATCHED;
331                     return false; 
332 
333                 case __COMPLETING:
334                     _initial=false;
335                     _state=__UNCOMPLETED;
336                     return true;
337 
338                 default:
339                     throw new IllegalStateException(this.getStatusString());
340             }
341         }
342     }
343 
344     /* ------------------------------------------------------------ */
345     public void dispatch()
346     {
347         boolean dispatch=false;
348         synchronized (this)
349         {
350             switch(_state)
351             {
352                 case __ASYNCSTARTED:
353                     _state=__REDISPATCHING;
354                     _resumed=true;
355                     return;
356 
357                 case __ASYNCWAIT:
358                     dispatch=!_expired;
359                     _state=__REDISPATCH;
360                     _resumed=true;
361                     break;
362                     
363                 case __REDISPATCH:
364                     return;
365                     
366                 default:
367                     throw new IllegalStateException(this.getStatusString());
368             }
369         }
370         
371         if (dispatch)
372         {
373             cancelTimeout();
374             scheduleDispatch();
375         }
376     }
377 
378     /* ------------------------------------------------------------ */
379     protected void expired()
380     {
381         final List<ContinuationListener> listeners;
382         synchronized (this)
383         {
384             switch(_state)
385             {
386                 case __ASYNCSTARTED:
387                 case __ASYNCWAIT:
388                     listeners=_continuationListeners;
389                     break;
390                 default:
391                     listeners=null;
392                     return;
393             }
394             _expired=true;
395         }
396         
397         if (listeners!=null)
398         {
399             for (int i=0;i<listeners.size();i++)
400             {
401                 ContinuationListener listener=listeners.get(i);
402                 try
403                 {
404                     listener.onTimeout(this);
405                 }
406                 catch(Exception e)
407                 {
408                     LOG.warn(e);
409                 }
410             }
411         }
412         
413         synchronized (this)
414         {
415             switch(_state)
416             {
417                 case __ASYNCSTARTED:
418                 case __ASYNCWAIT:
419                     dispatch();
420             }
421         }
422 
423         scheduleDispatch();
424     }
425     
426     /* ------------------------------------------------------------ */
427     /* (non-Javadoc)
428      * @see javax.servlet.ServletRequest#complete()
429      */
430     public void complete()
431     {
432         // just like resume, except don't set _resumed=true;
433         boolean dispatch=false;
434         synchronized (this)
435         {
436             switch(_state)
437             {
438                 case __DISPATCHED:
439                 case __REDISPATCHED:
440                     throw new IllegalStateException(this.getStatusString());
441 
442                 case __ASYNCSTARTED:
443                     _state=__COMPLETING;
444                     return;
445                     
446                 case __ASYNCWAIT:
447                     _state=__COMPLETING;
448                     dispatch=!_expired;
449                     break;
450                     
451                 default:
452                     throw new IllegalStateException(this.getStatusString());
453             }
454         }
455         
456         if (dispatch)
457         {
458             cancelTimeout();
459             scheduleDispatch();
460         }
461     }
462 
463     
464     /* ------------------------------------------------------------ */
465     /* (non-Javadoc)
466      * @see javax.servlet.ServletRequest#complete()
467      */
468     protected void doComplete()
469     {
470         final List<ContinuationListener> listeners;
471         synchronized (this)
472         {
473             switch(_state)
474             {
475                 case __UNCOMPLETED:
476                     _state=__COMPLETED;
477                     listeners=_continuationListeners;
478                     break;
479                     
480                 default:
481                     listeners=null;
482                     throw new IllegalStateException(this.getStatusString());
483             }
484         }
485         
486         if (listeners!=null)
487         {
488             for(int i=0;i<listeners.size();i++)
489             {
490                 try
491                 {
492                     listeners.get(i).onComplete(this);
493                 }
494                 catch(Exception e)
495                 {
496                     LOG.warn(e);
497                 }
498             }
499         }
500     }
501 
502     /* ------------------------------------------------------------ */
503     protected void recycle()
504     {
505         synchronized (this)
506         {
507 //            _history.append("r\n");
508             switch(_state)
509             {
510                 case __DISPATCHED:
511                 case __REDISPATCHED:
512                     throw new IllegalStateException(getStatusString());
513                 default:
514                     _state=__IDLE;
515             }
516             _initial = true;
517             _resumed=false;
518             _expired=false;
519             _responseWrapped=false;
520             cancelTimeout();
521             _timeoutMs=DEFAULT_TIMEOUT;
522             _continuationListeners=null;
523         }
524     }    
525     
526     /* ------------------------------------------------------------ */
527     public void cancel()
528     {
529         synchronized (this)
530         {
531             cancelTimeout();
532             _continuationListeners=null;
533         }
534     }
535 
536     /* ------------------------------------------------------------ */
537     protected void scheduleDispatch()
538     {
539         EndPoint endp=_connection.getEndPoint();
540         if (!endp.isBlocking())
541         {
542             ((AsyncEndPoint)endp).dispatch();
543         }
544     }
545 
546     /* ------------------------------------------------------------ */
547     protected void scheduleTimeout()
548     {
549         EndPoint endp=_connection.getEndPoint();
550         if (_timeoutMs>0)
551         {
552             if (endp.isBlocking())
553             {
554                 synchronized(this)
555                 {
556                     _expireAt = System.currentTimeMillis()+_timeoutMs;
557                     long wait=_timeoutMs;
558                     while (_expireAt>0 && wait>0 && _connection.getServer().isRunning())
559                     {
560                         try
561                         {
562                             this.wait(wait);
563                         }
564                         catch (InterruptedException e)
565                         {
566                             LOG.ignore(e);
567                         }
568                         wait=_expireAt-System.currentTimeMillis();
569                     }
570 
571                     if (_expireAt>0 && wait<=0 && _connection.getServer().isRunning())
572                     {
573                         expired();
574                     }
575                 }            
576             }
577             else
578             {
579                 _connection.scheduleTimeout(_event,_timeoutMs);
580             }
581         }
582     }
583 
584     /* ------------------------------------------------------------ */
585     protected void cancelTimeout()
586     {
587         EndPoint endp=_connection.getEndPoint();
588         if (endp.isBlocking())
589         {
590             synchronized(this)
591             {
592                 _expireAt=0;
593                 this.notifyAll();
594             }
595         }
596         else 
597         {
598             final AsyncEventState event=_event;
599             if (event!=null)
600                 _connection.cancelTimeout(event);
601         }
602     }
603 
604     /* ------------------------------------------------------------ */
605     public boolean isCompleting()
606     {
607         synchronized (this)
608         {
609             return _state==__COMPLETING;
610         }
611     }
612     
613     /* ------------------------------------------------------------ */
614     boolean isUncompleted()
615     {
616         synchronized (this)
617         {
618             return _state==__UNCOMPLETED;
619         }
620     } 
621     
622     /* ------------------------------------------------------------ */
623     public boolean isComplete()
624     {
625         synchronized (this)
626         {
627             return _state==__COMPLETED;
628         }
629     }
630 
631 
632     /* ------------------------------------------------------------ */
633     public boolean isAsyncStarted()
634     {
635         synchronized (this)
636         {
637             switch(_state)
638             {
639                 case __ASYNCSTARTED:
640                 case __REDISPATCHING:
641                 case __REDISPATCH:
642                 case __ASYNCWAIT:
643                     return true;
644 
645                 default:
646                     return false;
647             }
648         }
649     }
650 
651 
652     /* ------------------------------------------------------------ */
653     public boolean isAsync()
654     {
655         synchronized (this)
656         {
657             switch(_state)
658             {
659                 case __IDLE:
660                 case __DISPATCHED:
661                 case __UNCOMPLETED:
662                 case __COMPLETED:
663                     return false;
664 
665                 default:
666                     return true;
667             }
668         }
669     }
670 
671     /* ------------------------------------------------------------ */
672     public void dispatch(ServletContext context, String path)
673     {
674         _event._dispatchContext=context;
675         _event._path=path;
676         dispatch();
677     }
678 
679     /* ------------------------------------------------------------ */
680     public void dispatch(String path)
681     {
682         _event._path=path;
683         dispatch();
684     }
685 
686     /* ------------------------------------------------------------ */
687     public Request getBaseRequest()
688     {
689         return _connection.getRequest();
690     }
691     
692     /* ------------------------------------------------------------ */
693     public ServletRequest getRequest()
694     {
695         if (_event!=null)
696             return _event.getRequest();
697         return _connection.getRequest();
698     }
699 
700     /* ------------------------------------------------------------ */
701     public ServletResponse getResponse()
702     {
703         if (_event!=null)
704             return _event.getResponse();
705         return _connection.getResponse();
706     }
707 
708     /* ------------------------------------------------------------ */
709     public void start(final Runnable run)
710     {
711         final AsyncEventState event=_event;
712         if (event!=null)
713         {
714             _connection.getServer().getThreadPool().dispatch(new Runnable()
715             {
716                 public void run()
717                 {
718                     ((Context)event.getServletContext()).getContextHandler().handle(run);
719                 }
720             });
721         }
722     }
723 
724     /* ------------------------------------------------------------ */
725     public boolean hasOriginalRequestAndResponse()
726     {
727         synchronized (this)
728         {
729             return (_event!=null && _event.getRequest()==_connection._request && _event.getResponse()==_connection._response);
730         }
731     }
732 
733     /* ------------------------------------------------------------ */
734     public ContextHandler getContextHandler()
735     {
736         final AsyncEventState event=_event;
737         if (event!=null)
738             return ((Context)event.getServletContext()).getContextHandler();
739         return null;
740     }
741 
742 
743     /* ------------------------------------------------------------ */
744     /**
745      * @see Continuation#isResumed()
746      */
747     public boolean isResumed()
748     {
749         synchronized (this)
750         {
751             return _resumed;
752         }
753     }
754     /* ------------------------------------------------------------ */
755     /**
756      * @see Continuation#isExpired()
757      */
758     public boolean isExpired()
759     {
760         synchronized (this)
761         {
762             return _expired;
763         }
764     }
765 
766     /* ------------------------------------------------------------ */
767     /**
768      * @see Continuation#resume()
769      */
770     public void resume()
771     {
772         dispatch();
773     }
774     
775     /* ------------------------------------------------------------ */
776     /**
777      * @see Continuation#suspend()
778      */
779     public void suspend(ServletResponse response)
780     {
781         _responseWrapped=!(response instanceof Response);
782         AsyncContinuation.this.suspend(_connection.getRequest().getServletContext(),_connection.getRequest(),response); 
783     }
784 
785     /* ------------------------------------------------------------ */
786     /**
787      * @see Continuation#suspend()
788      */
789     public void suspend()
790     {
791         _responseWrapped=false;
792         AsyncContinuation.this.suspend(_connection.getRequest().getServletContext(),_connection.getRequest(),_connection.getResponse());       
793     }
794 
795     /* ------------------------------------------------------------ */
796     /**
797      * @see org.eclipse.jetty.continuation.Continuation#getServletResponse()
798      */
799     public ServletResponse getServletResponse()
800     {
801         if (_responseWrapped && _event!=null && _event.getResponse()!=null)
802             return _event.getResponse();
803         return _connection.getResponse();
804     }
805 
806     /* ------------------------------------------------------------ */
807     /**
808      * @see org.eclipse.jetty.continuation.Continuation#getAttribute(java.lang.String)
809      */
810     public Object getAttribute(String name)
811     {
812         return _connection.getRequest().getAttribute(name);
813     }
814 
815     /* ------------------------------------------------------------ */
816     /**
817      * @see org.eclipse.jetty.continuation.Continuation#removeAttribute(java.lang.String)
818      */
819     public void removeAttribute(String name)
820     {
821         _connection.getRequest().removeAttribute(name);
822     }
823 
824     /* ------------------------------------------------------------ */
825     /**
826      * @see org.eclipse.jetty.continuation.Continuation#setAttribute(java.lang.String, java.lang.Object)
827      */
828     public void setAttribute(String name, Object attribute)
829     {
830         _connection.getRequest().setAttribute(name,attribute);
831     }
832 
833     /* ------------------------------------------------------------ */
834     /**
835      * @see org.eclipse.jetty.continuation.Continuation#undispatch()
836      */
837     public void undispatch()
838     {
839         if (isSuspended())
840         {
841             if (LOG.isDebugEnabled())
842                 throw new ContinuationThrowable();
843             else
844                 throw __exception;
845         }
846         throw new IllegalStateException("!suspended");
847     }
848 
849     /* ------------------------------------------------------------ */
850     /* ------------------------------------------------------------ */
851     public class AsyncEventState extends Timeout.Task implements Runnable
852     {
853         private final ServletContext _suspendedContext;
854         private final ServletRequest _request;
855         private final ServletResponse _response;
856         private ServletContext _dispatchContext;
857         private String _path;
858         
859         public AsyncEventState(ServletContext context, ServletRequest request, ServletResponse response)
860         {
861             _suspendedContext=context;
862             _request=request;
863             _response=response;
864         }
865         
866         public ServletContext getSuspendedContext()
867         {
868             return _suspendedContext;
869         }
870         
871         public ServletContext getDispatchContext()
872         {
873             return _dispatchContext;
874         }
875         
876         public ServletContext getServletContext()
877         {
878             return _dispatchContext==null?_suspendedContext:_dispatchContext;
879         }
880 
881         public ServletRequest getRequest()
882         {
883             return _request;
884         }
885 
886         public ServletResponse getResponse()
887         {
888             return _response;
889         }
890         
891         public String getPath()
892         {
893             return _path;
894         }
895 
896         @Override
897         public void expired()
898         {
899             AsyncContinuation.this.expired();
900         }
901         
902         public void run()
903         {
904             AsyncContinuation.this.expired();  
905         }
906     }
907 }