1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.util.ArrayList;
22 import java.util.List;
23
24 import javax.servlet.ServletContext;
25 import javax.servlet.ServletRequest;
26 import javax.servlet.ServletResponse;
27 import javax.servlet.http.HttpServletRequest;
28
29 import org.eclipse.jetty.continuation.Continuation;
30 import org.eclipse.jetty.continuation.ContinuationListener;
31 import org.eclipse.jetty.continuation.ContinuationThrowable;
32 import org.eclipse.jetty.io.AsyncEndPoint;
33 import org.eclipse.jetty.io.EndPoint;
34 import org.eclipse.jetty.server.handler.ContextHandler;
35 import org.eclipse.jetty.server.handler.ContextHandler.Context;
36 import org.eclipse.jetty.util.URIUtil;
37 import org.eclipse.jetty.util.log.Log;
38 import org.eclipse.jetty.util.log.Logger;
39 import org.eclipse.jetty.util.thread.Timeout;
40
41
42
43
44
45 public class AsyncContinuation implements AsyncContext, Continuation
46 {
47 private static final Logger LOG = Log.getLogger(AsyncContinuation.class);
48
49 private final static long DEFAULT_TIMEOUT=30000L;
50
51 private final static ContinuationThrowable __exception = new ContinuationThrowable();
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66 private static final int __IDLE=0;
67 private static final int __DISPATCHED=1;
68 private static final int __ASYNCSTARTED=2;
69 private static final int __REDISPATCHING=3;
70 private static final int __ASYNCWAIT=4;
71 private static final int __REDISPATCH=5;
72 private static final int __REDISPATCHED=6;
73 private static final int __COMPLETING=7;
74 private static final int __UNCOMPLETED=8;
75 private static final int __COMPLETED=9;
76
77
78
79 protected AbstractHttpConnection _connection;
80 private List<ContinuationListener> _continuationListeners;
81
82
83 private int _state;
84 private boolean _initial;
85 private boolean _resumed;
86 private boolean _expired;
87 private volatile boolean _responseWrapped;
88 private long _timeoutMs=DEFAULT_TIMEOUT;
89 private AsyncEventState _event;
90 private volatile long _expireAt;
91
92
93 protected AsyncContinuation()
94 {
95 _state=__IDLE;
96 _initial=true;
97 }
98
99
100 protected void setConnection(final AbstractHttpConnection connection)
101 {
102 synchronized(this)
103 {
104 _connection=connection;
105 }
106 }
107
108
109 public void addContinuationListener(ContinuationListener listener)
110 {
111 synchronized(this)
112 {
113 if (_continuationListeners==null)
114 _continuationListeners=new ArrayList<ContinuationListener>();
115 _continuationListeners.add(listener);
116 }
117 }
118
119
120 public void setTimeout(long ms)
121 {
122 synchronized(this)
123 {
124 _timeoutMs=ms;
125 }
126 }
127
128
129 public long getTimeout()
130 {
131 synchronized(this)
132 {
133 return _timeoutMs;
134 }
135 }
136
137
138 public AsyncEventState getAsyncEventState()
139 {
140 synchronized(this)
141 {
142 return _event;
143 }
144 }
145
146
147
148
149
150
151
152
153
154
155 public boolean isResponseWrapped()
156 {
157 return _responseWrapped;
158 }
159
160
161
162
163
164 public boolean isInitial()
165 {
166 synchronized(this)
167 {
168 return _initial;
169 }
170 }
171
172
173
174
175
176 public boolean isSuspended()
177 {
178 synchronized(this)
179 {
180 switch(_state)
181 {
182 case __ASYNCSTARTED:
183 case __REDISPATCHING:
184 case __COMPLETING:
185 case __ASYNCWAIT:
186 return true;
187
188 default:
189 return false;
190 }
191 }
192 }
193
194
195 public boolean isSuspending()
196 {
197 synchronized(this)
198 {
199 switch(_state)
200 {
201 case __ASYNCSTARTED:
202 case __ASYNCWAIT:
203 return true;
204
205 default:
206 return false;
207 }
208 }
209 }
210
211
212 public boolean isDispatchable()
213 {
214 synchronized(this)
215 {
216 switch(_state)
217 {
218 case __REDISPATCH:
219 case __REDISPATCHED:
220 case __REDISPATCHING:
221 case __COMPLETING:
222 return true;
223
224 default:
225 return false;
226 }
227 }
228 }
229
230
231 @Override
232 public String toString()
233 {
234 synchronized (this)
235 {
236 return super.toString()+"@"+getStatusString();
237 }
238 }
239
240
241 public String getStatusString()
242 {
243 synchronized (this)
244 {
245 return
246 ((_state==__IDLE)?"IDLE":
247 (_state==__DISPATCHED)?"DISPATCHED":
248 (_state==__ASYNCSTARTED)?"ASYNCSTARTED":
249 (_state==__ASYNCWAIT)?"ASYNCWAIT":
250 (_state==__REDISPATCHING)?"REDISPATCHING":
251 (_state==__REDISPATCH)?"REDISPATCH":
252 (_state==__REDISPATCHED)?"REDISPATCHED":
253 (_state==__COMPLETING)?"COMPLETING":
254 (_state==__UNCOMPLETED)?"UNCOMPLETED":
255 (_state==__COMPLETED)?"COMPLETE":
256 ("UNKNOWN?"+_state))+
257 (_initial?",initial":"")+
258 (_resumed?",resumed":"")+
259 (_expired?",expired":"");
260 }
261 }
262
263
264
265
266
267 protected boolean handling()
268 {
269 synchronized (this)
270 {
271 _responseWrapped=false;
272
273 switch(_state)
274 {
275 case __IDLE:
276 _initial=true;
277 _state=__DISPATCHED;
278 return true;
279
280 case __COMPLETING:
281 _state=__UNCOMPLETED;
282 return false;
283
284 case __ASYNCWAIT:
285 return false;
286
287 case __REDISPATCH:
288 _state=__REDISPATCHED;
289 return true;
290
291 default:
292 throw new IllegalStateException(this.getStatusString());
293 }
294 }
295 }
296
297
298
299
300
301 private void doSuspend(final ServletContext context,
302 final ServletRequest request,
303 final ServletResponse response)
304 {
305 synchronized (this)
306 {
307 switch(_state)
308 {
309 case __DISPATCHED:
310 case __REDISPATCHED:
311 _resumed=false;
312 _expired=false;
313
314 if (_event==null || request!=_event.getRequest() || response != _event.getResponse() || context != _event.getServletContext())
315 _event=new AsyncEventState(context,request,response);
316 else
317 {
318 _event._dispatchContext=null;
319 _event._pathInContext=null;
320 }
321 _state=__ASYNCSTARTED;
322 break;
323
324 default:
325 throw new IllegalStateException(this.getStatusString());
326 }
327 }
328 }
329
330
331
332
333
334
335
336
337
338 protected boolean unhandle()
339 {
340 synchronized (this)
341 {
342 List<ContinuationListener> listeners=_continuationListeners;
343
344 switch(_state)
345 {
346 case __REDISPATCHED:
347 case __DISPATCHED:
348 _state=__UNCOMPLETED;
349 return true;
350
351 case __IDLE:
352 throw new IllegalStateException(this.getStatusString());
353
354 case __ASYNCSTARTED:
355 _initial=false;
356 _state=__ASYNCWAIT;
357 scheduleTimeout();
358 if (_state==__ASYNCWAIT)
359 return true;
360 else if (_state==__COMPLETING)
361 {
362 _state=__UNCOMPLETED;
363 return true;
364 }
365 _initial=false;
366 _state=__REDISPATCHED;
367 return false;
368
369 case __REDISPATCHING:
370 _initial=false;
371 _state=__REDISPATCHED;
372 return false;
373
374 case __COMPLETING:
375 _initial=false;
376 _state=__UNCOMPLETED;
377 return true;
378
379 default:
380 throw new IllegalStateException(this.getStatusString());
381 }
382 }
383 }
384
385
386 public void dispatch()
387 {
388 boolean dispatch=false;
389 synchronized (this)
390 {
391 switch(_state)
392 {
393 case __ASYNCSTARTED:
394 _state=__REDISPATCHING;
395 _resumed=true;
396 return;
397
398 case __ASYNCWAIT:
399 dispatch=!_expired;
400 _state=__REDISPATCH;
401 _resumed=true;
402 break;
403
404 case __REDISPATCH:
405 return;
406
407 default:
408 throw new IllegalStateException(this.getStatusString());
409 }
410 }
411
412 if (dispatch)
413 {
414 cancelTimeout();
415 scheduleDispatch();
416 }
417 }
418
419
420 protected void expired()
421 {
422 final List<ContinuationListener> listeners;
423 synchronized (this)
424 {
425 switch(_state)
426 {
427 case __ASYNCSTARTED:
428 case __ASYNCWAIT:
429 listeners=_continuationListeners;
430 break;
431 default:
432 listeners=null;
433 return;
434 }
435 _expired=true;
436 }
437
438 if (listeners!=null)
439 {
440 for (int i=0;i<listeners.size();i++)
441 {
442 ContinuationListener listener=listeners.get(i);
443 try
444 {
445 listener.onTimeout(this);
446 }
447 catch(Exception e)
448 {
449 LOG.warn(e);
450 }
451 }
452 }
453
454 synchronized (this)
455 {
456 switch(_state)
457 {
458 case __ASYNCSTARTED:
459 case __ASYNCWAIT:
460 dispatch();
461 }
462 }
463
464 scheduleDispatch();
465 }
466
467
468
469
470
471 public void complete()
472 {
473
474 boolean dispatch=false;
475 synchronized (this)
476 {
477 switch(_state)
478 {
479 case __DISPATCHED:
480 case __REDISPATCHED:
481 throw new IllegalStateException(this.getStatusString());
482
483 case __ASYNCSTARTED:
484 _state=__COMPLETING;
485 return;
486
487 case __ASYNCWAIT:
488 _state=__COMPLETING;
489 dispatch=!_expired;
490 break;
491
492 default:
493 throw new IllegalStateException(this.getStatusString());
494 }
495 }
496
497 if (dispatch)
498 {
499 cancelTimeout();
500 scheduleDispatch();
501 }
502 }
503
504
505
506
507
508
509 protected void doComplete()
510 {
511 final List<ContinuationListener> listeners;
512 synchronized (this)
513 {
514 switch(_state)
515 {
516 case __UNCOMPLETED:
517 _state=__COMPLETED;
518 listeners=_continuationListeners;
519 break;
520
521 default:
522 listeners=null;
523 throw new IllegalStateException(this.getStatusString());
524 }
525 }
526
527 if (listeners!=null)
528 {
529 for(int i=0;i<listeners.size();i++)
530 {
531 try
532 {
533 listeners.get(i).onComplete(this);
534 }
535 catch(Exception e)
536 {
537 LOG.warn(e);
538 }
539 }
540 }
541 }
542
543
544 protected void recycle()
545 {
546 synchronized (this)
547 {
548
549 switch(_state)
550 {
551 case __DISPATCHED:
552 case __REDISPATCHED:
553 throw new IllegalStateException(getStatusString());
554 default:
555 _state=__IDLE;
556 }
557 _initial = true;
558 _resumed=false;
559 _expired=false;
560 _responseWrapped=false;
561 cancelTimeout();
562 _timeoutMs=DEFAULT_TIMEOUT;
563 _continuationListeners=null;
564 }
565 }
566
567
568 public void cancel()
569 {
570 synchronized (this)
571 {
572 cancelTimeout();
573 _continuationListeners=null;
574 }
575 }
576
577
578 protected void scheduleDispatch()
579 {
580 EndPoint endp=_connection.getEndPoint();
581 if (!endp.isBlocking())
582 {
583 ((AsyncEndPoint)endp).asyncDispatch();
584 }
585 }
586
587
588 protected void scheduleTimeout()
589 {
590 EndPoint endp=_connection.getEndPoint();
591 if (_timeoutMs>0)
592 {
593 if (endp.isBlocking())
594 {
595 synchronized(this)
596 {
597 _expireAt = System.currentTimeMillis()+_timeoutMs;
598 long wait=_timeoutMs;
599 while (_expireAt>0 && wait>0 && _connection.getServer().isRunning())
600 {
601 try
602 {
603 this.wait(wait);
604 }
605 catch (InterruptedException e)
606 {
607 LOG.ignore(e);
608 }
609 wait=_expireAt-System.currentTimeMillis();
610 }
611
612 if (_expireAt>0 && wait<=0 && _connection.getServer().isRunning())
613 {
614 expired();
615 }
616 }
617 }
618 else
619 {
620 ((AsyncEndPoint)endp).scheduleTimeout(_event,_timeoutMs);
621 }
622 }
623 }
624
625
626 protected void cancelTimeout()
627 {
628 EndPoint endp=_connection.getEndPoint();
629 if (endp.isBlocking())
630 {
631 synchronized(this)
632 {
633 _expireAt=0;
634 this.notifyAll();
635 }
636 }
637 else
638 {
639 final AsyncEventState event=_event;
640 if (event!=null)
641 {
642 ((AsyncEndPoint)endp).cancelTimeout(event);
643 }
644 }
645 }
646
647
648 public boolean isCompleting()
649 {
650 synchronized (this)
651 {
652 return _state==__COMPLETING;
653 }
654 }
655
656
657 boolean isUncompleted()
658 {
659 synchronized (this)
660 {
661 return _state==__UNCOMPLETED;
662 }
663 }
664
665
666 public boolean isComplete()
667 {
668 synchronized (this)
669 {
670 return _state==__COMPLETED;
671 }
672 }
673
674
675
676 public boolean isAsyncStarted()
677 {
678 synchronized (this)
679 {
680 switch(_state)
681 {
682 case __ASYNCSTARTED:
683 case __REDISPATCHING:
684 case __REDISPATCH:
685 case __ASYNCWAIT:
686 return true;
687
688 default:
689 return false;
690 }
691 }
692 }
693
694
695
696 public boolean isAsync()
697 {
698 synchronized (this)
699 {
700 switch(_state)
701 {
702 case __IDLE:
703 case __DISPATCHED:
704 case __UNCOMPLETED:
705 case __COMPLETED:
706 return false;
707
708 default:
709 return true;
710 }
711 }
712 }
713
714
715 public void dispatch(ServletContext context, String path)
716 {
717 _event._dispatchContext=context;
718 _event._pathInContext=path;
719 dispatch();
720 }
721
722
723 public void dispatch(String path)
724 {
725 _event._pathInContext=path;
726 dispatch();
727 }
728
729
730 public Request getBaseRequest()
731 {
732 return _connection.getRequest();
733 }
734
735
736 public ServletRequest getRequest()
737 {
738 if (_event!=null)
739 return _event.getRequest();
740 return _connection.getRequest();
741 }
742
743
744 public ServletResponse getResponse()
745 {
746 if (_event!=null)
747 return _event.getResponse();
748 return _connection.getResponse();
749 }
750
751
752 public void start(final Runnable run)
753 {
754 final AsyncEventState event=_event;
755 if (event!=null)
756 {
757 _connection.getServer().getThreadPool().dispatch(new Runnable()
758 {
759 public void run()
760 {
761 ((Context)event.getServletContext()).getContextHandler().handle(run);
762 }
763 });
764 }
765 }
766
767
768 public boolean hasOriginalRequestAndResponse()
769 {
770 synchronized (this)
771 {
772 return (_event!=null && _event.getRequest()==_connection._request && _event.getResponse()==_connection._response);
773 }
774 }
775
776
777 public ContextHandler getContextHandler()
778 {
779 final AsyncEventState event=_event;
780 if (event!=null)
781 return ((Context)event.getServletContext()).getContextHandler();
782 return null;
783 }
784
785
786
787
788
789
790 public boolean isResumed()
791 {
792 synchronized (this)
793 {
794 return _resumed;
795 }
796 }
797
798
799
800
801 public boolean isExpired()
802 {
803 synchronized (this)
804 {
805 return _expired;
806 }
807 }
808
809
810
811
812
813 public void resume()
814 {
815 dispatch();
816 }
817
818
819
820
821 protected void suspend(final ServletContext context,
822 final ServletRequest request,
823 final ServletResponse response)
824 {
825 synchronized (this)
826 {
827 _responseWrapped=!(response instanceof Response);
828 doSuspend(context,request,response);
829 if (request instanceof HttpServletRequest)
830 {
831 _event._pathInContext = URIUtil.addPaths(((HttpServletRequest)request).getServletPath(),((HttpServletRequest)request).getPathInfo());
832 }
833 }
834 }
835
836
837
838
839
840
841 public void suspend(ServletResponse response)
842 {
843 _responseWrapped=!(response instanceof Response);
844 doSuspend(_connection.getRequest().getServletContext(),_connection.getRequest(),response);
845 }
846
847
848
849
850
851 public void suspend()
852 {
853 _responseWrapped=false;
854 doSuspend(_connection.getRequest().getServletContext(),_connection.getRequest(),_connection.getResponse());
855 }
856
857
858
859
860
861 public ServletResponse getServletResponse()
862 {
863 if (_responseWrapped && _event!=null && _event.getResponse()!=null)
864 return _event.getResponse();
865 return _connection.getResponse();
866 }
867
868
869
870
871
872 public Object getAttribute(String name)
873 {
874 return _connection.getRequest().getAttribute(name);
875 }
876
877
878
879
880
881 public void removeAttribute(String name)
882 {
883 _connection.getRequest().removeAttribute(name);
884 }
885
886
887
888
889
890 public void setAttribute(String name, Object attribute)
891 {
892 _connection.getRequest().setAttribute(name,attribute);
893 }
894
895
896
897
898
899 public void undispatch()
900 {
901 if (isSuspended())
902 {
903 if (LOG.isDebugEnabled())
904 throw new ContinuationThrowable();
905 else
906 throw __exception;
907 }
908 throw new IllegalStateException("!suspended");
909 }
910
911
912
913 public class AsyncEventState extends Timeout.Task implements Runnable
914 {
915 private final ServletContext _suspendedContext;
916 private final ServletRequest _request;
917 private final ServletResponse _response;
918 private ServletContext _dispatchContext;
919 private String _pathInContext;
920
921 public AsyncEventState(ServletContext context, ServletRequest request, ServletResponse response)
922 {
923 _suspendedContext=context;
924 _request=request;
925 _response=response;
926
927
928
929 Request r=_connection.getRequest();
930
931
932 if (r.getAttribute(AsyncContext.ASYNC_REQUEST_URI)==null)
933 {
934
935
936
937
938 String uri=(String)r.getAttribute(Dispatcher.FORWARD_REQUEST_URI);
939 if (uri!=null)
940 {
941 r.setAttribute(AsyncContext.ASYNC_REQUEST_URI,uri);
942 r.setAttribute(AsyncContext.ASYNC_CONTEXT_PATH,r.getAttribute(Dispatcher.FORWARD_CONTEXT_PATH));
943 r.setAttribute(AsyncContext.ASYNC_SERVLET_PATH,r.getAttribute(Dispatcher.FORWARD_SERVLET_PATH));
944 r.setAttribute(AsyncContext.ASYNC_PATH_INFO,r.getAttribute(Dispatcher.FORWARD_PATH_INFO));
945 r.setAttribute(AsyncContext.ASYNC_QUERY_STRING,r.getAttribute(Dispatcher.FORWARD_QUERY_STRING));
946 }
947 else
948 {
949 r.setAttribute(AsyncContext.ASYNC_REQUEST_URI,r.getRequestURI());
950 r.setAttribute(AsyncContext.ASYNC_CONTEXT_PATH,r.getContextPath());
951 r.setAttribute(AsyncContext.ASYNC_SERVLET_PATH,r.getServletPath());
952 r.setAttribute(AsyncContext.ASYNC_PATH_INFO,r.getPathInfo());
953 r.setAttribute(AsyncContext.ASYNC_QUERY_STRING,r.getQueryString());
954 }
955 }
956 }
957
958 public ServletContext getSuspendedContext()
959 {
960 return _suspendedContext;
961 }
962
963 public ServletContext getDispatchContext()
964 {
965 return _dispatchContext;
966 }
967
968 public ServletContext getServletContext()
969 {
970 return _dispatchContext==null?_suspendedContext:_dispatchContext;
971 }
972
973 public ServletRequest getRequest()
974 {
975 return _request;
976 }
977
978 public ServletResponse getResponse()
979 {
980 return _response;
981 }
982
983
984
985
986
987 public String getPath()
988 {
989 return _pathInContext;
990 }
991
992 @Override
993 public void expired()
994 {
995 AsyncContinuation.this.expired();
996 }
997
998 public void run()
999 {
1000 AsyncContinuation.this.expired();
1001 }
1002 }
1003 }