1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import javax.servlet.AsyncContext;
22 import javax.servlet.AsyncEvent;
23 import javax.servlet.AsyncListener;
24 import javax.servlet.ServletResponseWrapper;
25 import javax.servlet.ServletException;
26
27 import java.util.ArrayList;
28 import java.util.List;
29
30 import javax.servlet.ServletContext;
31 import javax.servlet.ServletRequest;
32 import javax.servlet.ServletResponse;
33 import javax.servlet.http.HttpServletRequest;
34
35 import org.eclipse.jetty.continuation.Continuation;
36 import org.eclipse.jetty.continuation.ContinuationThrowable;
37 import org.eclipse.jetty.continuation.ContinuationListener;
38 import org.eclipse.jetty.io.AsyncEndPoint;
39 import org.eclipse.jetty.io.EndPoint;
40 import org.eclipse.jetty.server.handler.ContextHandler;
41 import org.eclipse.jetty.server.handler.ContextHandler.Context;
42 import org.eclipse.jetty.util.URIUtil;
43 import org.eclipse.jetty.util.log.Log;
44 import org.eclipse.jetty.util.log.Logger;
45 import org.eclipse.jetty.util.thread.Timeout;
46
47
48
49
50
51 public class AsyncContinuation implements AsyncContext, Continuation
52 {
53 private static final Logger LOG = Log.getLogger(AsyncContinuation.class);
54
55 private final static long DEFAULT_TIMEOUT=30000L;
56
57 private final static ContinuationThrowable __exception = new ContinuationThrowable();
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72 private static final int __IDLE=0;
73 private static final int __DISPATCHED=1;
74 private static final int __ASYNCSTARTED=2;
75 private static final int __REDISPATCHING=3;
76 private static final int __ASYNCWAIT=4;
77 private static final int __REDISPATCH=5;
78 private static final int __REDISPATCHED=6;
79 private static final int __COMPLETING=7;
80 private static final int __UNCOMPLETED=8;
81 private static final int __COMPLETED=9;
82
83
84 protected AbstractHttpConnection _connection;
85 private List<AsyncListener> _lastAsyncListeners;
86 private List<AsyncListener> _asyncListeners;
87 private List<ContinuationListener> _continuationListeners;
88
89
90 private int _state;
91 private boolean _initial;
92 private boolean _resumed;
93 private boolean _expired;
94 private volatile boolean _responseWrapped;
95 private long _timeoutMs=DEFAULT_TIMEOUT;
96 private AsyncEventState _event;
97 private volatile long _expireAt;
98 private volatile boolean _continuation;
99
100
101 protected AsyncContinuation()
102 {
103 _state=__IDLE;
104 _initial=true;
105 }
106
107
108 protected void setConnection(final AbstractHttpConnection connection)
109 {
110 synchronized(this)
111 {
112 _connection=connection;
113 }
114 }
115
116
117 public void addListener(AsyncListener listener)
118 {
119 synchronized(this)
120 {
121 if (_asyncListeners==null)
122 _asyncListeners=new ArrayList<AsyncListener>();
123 _asyncListeners.add(listener);
124 }
125 }
126
127
128 public void addListener(AsyncListener listener,ServletRequest request, ServletResponse response)
129 {
130 synchronized(this)
131 {
132
133 if (_asyncListeners==null)
134 _asyncListeners=new ArrayList<AsyncListener>();
135 _asyncListeners.add(listener);
136 }
137 }
138
139
140 public void addContinuationListener(ContinuationListener listener)
141 {
142 synchronized(this)
143 {
144 if (_continuationListeners==null)
145 _continuationListeners=new ArrayList<ContinuationListener>();
146 _continuationListeners.add(listener);
147 }
148 }
149
150
151 public void setTimeout(long ms)
152 {
153 synchronized(this)
154 {
155 _timeoutMs=ms;
156 }
157 }
158
159
160 public long getTimeout()
161 {
162 synchronized(this)
163 {
164 return _timeoutMs;
165 }
166 }
167
168
169 public AsyncEventState getAsyncEventState()
170 {
171 synchronized(this)
172 {
173 return _event;
174 }
175 }
176
177
178
179
180
181
182
183
184
185
186 public boolean isResponseWrapped()
187 {
188 return _responseWrapped;
189 }
190
191
192
193
194
195 public boolean isInitial()
196 {
197 synchronized(this)
198 {
199 return _initial;
200 }
201 }
202
203
204
205
206
207 public boolean isSuspended()
208 {
209 synchronized(this)
210 {
211 switch(_state)
212 {
213 case __ASYNCSTARTED:
214 case __REDISPATCHING:
215 case __COMPLETING:
216 case __ASYNCWAIT:
217 return true;
218
219 default:
220 return false;
221 }
222 }
223 }
224
225
226 public boolean isSuspending()
227 {
228 synchronized(this)
229 {
230 switch(_state)
231 {
232 case __ASYNCSTARTED:
233 case __ASYNCWAIT:
234 return true;
235
236 default:
237 return false;
238 }
239 }
240 }
241
242
243 public boolean isDispatchable()
244 {
245 synchronized(this)
246 {
247 switch(_state)
248 {
249 case __REDISPATCH:
250 case __REDISPATCHED:
251 case __REDISPATCHING:
252 case __COMPLETING:
253 return true;
254
255 default:
256 return false;
257 }
258 }
259 }
260
261
262 @Override
263 public String toString()
264 {
265 synchronized (this)
266 {
267 return super.toString()+"@"+getStatusString();
268 }
269 }
270
271
272 public String getStatusString()
273 {
274 synchronized (this)
275 {
276 return
277 ((_state==__IDLE)?"IDLE":
278 (_state==__DISPATCHED)?"DISPATCHED":
279 (_state==__ASYNCSTARTED)?"ASYNCSTARTED":
280 (_state==__ASYNCWAIT)?"ASYNCWAIT":
281 (_state==__REDISPATCHING)?"REDISPATCHING":
282 (_state==__REDISPATCH)?"REDISPATCH":
283 (_state==__REDISPATCHED)?"REDISPATCHED":
284 (_state==__COMPLETING)?"COMPLETING":
285 (_state==__UNCOMPLETED)?"UNCOMPLETED":
286 (_state==__COMPLETED)?"COMPLETE":
287 ("UNKNOWN?"+_state))+
288 (_initial?",initial":"")+
289 (_resumed?",resumed":"")+
290 (_expired?",expired":"");
291 }
292 }
293
294
295
296
297
298 protected boolean handling()
299 {
300 synchronized (this)
301 {
302 _continuation=false;
303 _responseWrapped=false;
304
305 switch(_state)
306 {
307 case __IDLE:
308 _initial=true;
309 _state=__DISPATCHED;
310 if (_lastAsyncListeners!=null)
311 _lastAsyncListeners.clear();
312 if (_asyncListeners!=null)
313 _asyncListeners.clear();
314 else
315 {
316 _asyncListeners=_lastAsyncListeners;
317 _lastAsyncListeners=null;
318 }
319 return true;
320
321 case __COMPLETING:
322 _state=__UNCOMPLETED;
323 return false;
324
325 case __ASYNCWAIT:
326 return false;
327
328 case __REDISPATCH:
329 _state=__REDISPATCHED;
330 return true;
331
332 default:
333 throw new IllegalStateException(this.getStatusString());
334 }
335 }
336 }
337
338
339
340
341
342 private void doSuspend(final ServletContext context,
343 final ServletRequest request,
344 final ServletResponse response)
345 {
346 synchronized (this)
347 {
348 switch(_state)
349 {
350 case __DISPATCHED:
351 case __REDISPATCHED:
352 _resumed=false;
353 _expired=false;
354
355 if (_event==null || request!=_event.getSuppliedRequest() || response != _event.getSuppliedResponse() || context != _event.getServletContext())
356 _event=new AsyncEventState(context,request,response);
357 else
358 {
359 _event._dispatchContext=null;
360 _event._pathInContext=null;
361 }
362 _state=__ASYNCSTARTED;
363 List<AsyncListener> recycle=_lastAsyncListeners;
364 _lastAsyncListeners=_asyncListeners;
365 _asyncListeners=recycle;
366 if (_asyncListeners!=null)
367 _asyncListeners.clear();
368 break;
369
370 default:
371 throw new IllegalStateException(this.getStatusString());
372 }
373 }
374
375 if (_lastAsyncListeners!=null)
376 {
377 for (AsyncListener listener : _lastAsyncListeners)
378 {
379 try
380 {
381 listener.onStartAsync(_event);
382 }
383 catch(Exception e)
384 {
385 LOG.warn(e);
386 }
387 }
388 }
389 }
390
391
392
393
394
395
396
397
398
399 protected boolean unhandle()
400 {
401 synchronized (this)
402 {
403 switch(_state)
404 {
405 case __REDISPATCHED:
406 case __DISPATCHED:
407 _state=__UNCOMPLETED;
408 return true;
409
410 case __IDLE:
411 throw new IllegalStateException(this.getStatusString());
412
413 case __ASYNCSTARTED:
414 _initial=false;
415 _state=__ASYNCWAIT;
416 scheduleTimeout();
417 if (_state==__ASYNCWAIT)
418 return true;
419 else if (_state==__COMPLETING)
420 {
421 _state=__UNCOMPLETED;
422 return true;
423 }
424 _initial=false;
425 _state=__REDISPATCHED;
426 return false;
427
428 case __REDISPATCHING:
429 _initial=false;
430 _state=__REDISPATCHED;
431 return false;
432
433 case __COMPLETING:
434 _initial=false;
435 _state=__UNCOMPLETED;
436 return true;
437
438 default:
439 throw new IllegalStateException(this.getStatusString());
440 }
441 }
442 }
443
444
445 public void dispatch()
446 {
447 boolean dispatch=false;
448 synchronized (this)
449 {
450 switch(_state)
451 {
452 case __ASYNCSTARTED:
453 _state=__REDISPATCHING;
454 _resumed=true;
455 return;
456
457 case __ASYNCWAIT:
458 dispatch=!_expired;
459 _state=__REDISPATCH;
460 _resumed=true;
461 break;
462
463 case __REDISPATCH:
464 return;
465
466 default:
467 throw new IllegalStateException(this.getStatusString());
468 }
469 }
470
471 if (dispatch)
472 {
473 cancelTimeout();
474 scheduleDispatch();
475 }
476 }
477
478
479 protected void expired()
480 {
481 final List<ContinuationListener> cListeners;
482 final List<AsyncListener> aListeners;
483 synchronized (this)
484 {
485 switch(_state)
486 {
487 case __ASYNCSTARTED:
488 case __ASYNCWAIT:
489 cListeners=_continuationListeners;
490 aListeners=_asyncListeners;
491 break;
492 default:
493 cListeners=null;
494 aListeners=null;
495 return;
496 }
497 _expired=true;
498 }
499
500 if (aListeners!=null)
501 {
502 for (AsyncListener listener : aListeners)
503 {
504 try
505 {
506 listener.onTimeout(_event);
507 }
508 catch(Exception e)
509 {
510 LOG.warn(e);
511 }
512 }
513 }
514 if (cListeners!=null)
515 {
516 for (ContinuationListener listener : cListeners)
517 {
518 try
519 {
520 listener.onTimeout(this);
521 }
522 catch(Exception e)
523 {
524 LOG.warn(e);
525 }
526 }
527 }
528
529
530
531 synchronized (this)
532 {
533 switch(_state)
534 {
535 case __ASYNCSTARTED:
536 case __ASYNCWAIT:
537 if (_continuation)
538 dispatch();
539 else
540
541 complete();
542 }
543 }
544
545 scheduleDispatch();
546 }
547
548
549
550
551
552 public void complete()
553 {
554
555 boolean dispatch=false;
556 synchronized (this)
557 {
558 switch(_state)
559 {
560 case __DISPATCHED:
561 case __REDISPATCHED:
562 throw new IllegalStateException(this.getStatusString());
563
564 case __ASYNCSTARTED:
565 _state=__COMPLETING;
566 return;
567
568 case __ASYNCWAIT:
569 _state=__COMPLETING;
570 dispatch=!_expired;
571 break;
572
573 default:
574 throw new IllegalStateException(this.getStatusString());
575 }
576 }
577
578 if (dispatch)
579 {
580 cancelTimeout();
581 scheduleDispatch();
582 }
583 }
584
585
586 @Override
587 public <T extends AsyncListener> T createListener(Class<T> clazz) throws ServletException
588 {
589 try
590 {
591
592 return clazz.newInstance();
593 }
594 catch(Exception e)
595 {
596 throw new ServletException(e);
597 }
598 }
599
600
601
602
603
604
605 protected void doComplete(Throwable ex)
606 {
607 final List<ContinuationListener> cListeners;
608 final List<AsyncListener> aListeners;
609 synchronized (this)
610 {
611 switch(_state)
612 {
613 case __UNCOMPLETED:
614 _state=__COMPLETED;
615 cListeners=_continuationListeners;
616 aListeners=_asyncListeners;
617 break;
618
619 default:
620 cListeners=null;
621 aListeners=null;
622 throw new IllegalStateException(this.getStatusString());
623 }
624 }
625
626 if (aListeners!=null)
627 {
628 for (AsyncListener listener : aListeners)
629 {
630 try
631 {
632 if (ex!=null)
633 {
634 _event.getSuppliedRequest().setAttribute(Dispatcher.ERROR_EXCEPTION,ex);
635 _event.getSuppliedRequest().setAttribute(Dispatcher.ERROR_MESSAGE,ex.getMessage());
636 listener.onError(_event);
637 }
638 else
639 listener.onComplete(_event);
640 }
641 catch(Exception e)
642 {
643 LOG.warn(e);
644 }
645 }
646 }
647 if (cListeners!=null)
648 {
649 for (ContinuationListener listener : cListeners)
650 {
651 try
652 {
653 listener.onComplete(this);
654 }
655 catch(Exception e)
656 {
657 LOG.warn(e);
658 }
659 }
660 }
661 }
662
663
664 protected void recycle()
665 {
666 synchronized (this)
667 {
668 switch(_state)
669 {
670 case __DISPATCHED:
671 case __REDISPATCHED:
672 throw new IllegalStateException(getStatusString());
673 default:
674 _state=__IDLE;
675 }
676 _initial = true;
677 _resumed=false;
678 _expired=false;
679 _responseWrapped=false;
680 cancelTimeout();
681 _timeoutMs=DEFAULT_TIMEOUT;
682 _continuationListeners=null;
683 }
684 }
685
686
687 public void cancel()
688 {
689 synchronized (this)
690 {
691 cancelTimeout();
692 _continuationListeners=null;
693 }
694 }
695
696
697 protected void scheduleDispatch()
698 {
699 EndPoint endp=_connection.getEndPoint();
700 if (!endp.isBlocking())
701 {
702 ((AsyncEndPoint)endp).asyncDispatch();
703 }
704 }
705
706
707 protected void scheduleTimeout()
708 {
709 EndPoint endp=_connection.getEndPoint();
710 if (_timeoutMs>0)
711 {
712 if (endp.isBlocking())
713 {
714 synchronized(this)
715 {
716 _expireAt = System.currentTimeMillis()+_timeoutMs;
717 long wait=_timeoutMs;
718 while (_expireAt>0 && wait>0 && _connection.getServer().isRunning())
719 {
720 try
721 {
722 this.wait(wait);
723 }
724 catch (InterruptedException e)
725 {
726 LOG.ignore(e);
727 }
728 wait=_expireAt-System.currentTimeMillis();
729 }
730
731 if (_expireAt>0 && wait<=0 && _connection.getServer().isRunning())
732 {
733 expired();
734 }
735 }
736 }
737 else
738 {
739 ((AsyncEndPoint)endp).scheduleTimeout(_event._timeout,_timeoutMs);
740 }
741 }
742 }
743
744
745 protected void cancelTimeout()
746 {
747 EndPoint endp=_connection.getEndPoint();
748 if (endp.isBlocking())
749 {
750 synchronized(this)
751 {
752 _expireAt=0;
753 this.notifyAll();
754 }
755 }
756 else
757 {
758 final AsyncEventState event=_event;
759 if (event!=null)
760 {
761 ((AsyncEndPoint)endp).cancelTimeout(event._timeout);
762 }
763 }
764 }
765
766
767 public boolean isCompleting()
768 {
769 synchronized (this)
770 {
771 return _state==__COMPLETING;
772 }
773 }
774
775
776 boolean isUncompleted()
777 {
778 synchronized (this)
779 {
780 return _state==__UNCOMPLETED;
781 }
782 }
783
784
785 public boolean isComplete()
786 {
787 synchronized (this)
788 {
789 return _state==__COMPLETED;
790 }
791 }
792
793
794
795 public boolean isAsyncStarted()
796 {
797 synchronized (this)
798 {
799 switch(_state)
800 {
801 case __ASYNCSTARTED:
802 case __REDISPATCHING:
803 case __REDISPATCH:
804 case __ASYNCWAIT:
805 return true;
806
807 default:
808 return false;
809 }
810 }
811 }
812
813
814
815 public boolean isAsync()
816 {
817 synchronized (this)
818 {
819 switch(_state)
820 {
821 case __IDLE:
822 case __DISPATCHED:
823 case __UNCOMPLETED:
824 case __COMPLETED:
825 return false;
826
827 default:
828 return true;
829 }
830 }
831 }
832
833
834 public void dispatch(ServletContext context, String path)
835 {
836 _event._dispatchContext=context;
837 _event._pathInContext=path;
838 dispatch();
839 }
840
841
842 public void dispatch(String path)
843 {
844 _event._pathInContext=path;
845 dispatch();
846 }
847
848
849 public Request getBaseRequest()
850 {
851 return _connection.getRequest();
852 }
853
854
855 public ServletRequest getRequest()
856 {
857 if (_event!=null)
858 return _event.getSuppliedRequest();
859 return _connection.getRequest();
860 }
861
862
863 public ServletResponse getResponse()
864 {
865 if (_responseWrapped && _event!=null && _event.getSuppliedResponse()!=null)
866 return _event.getSuppliedResponse();
867 return _connection.getResponse();
868 }
869
870
871 public void start(final Runnable run)
872 {
873 final AsyncEventState event=_event;
874 if (event!=null)
875 {
876 _connection.getServer().getThreadPool().dispatch(new Runnable()
877 {
878 public void run()
879 {
880 ((Context)event.getServletContext()).getContextHandler().handle(run);
881 }
882 });
883 }
884 }
885
886
887 public boolean hasOriginalRequestAndResponse()
888 {
889 synchronized (this)
890 {
891 return (_event!=null && _event.getSuppliedRequest()==_connection._request && _event.getSuppliedResponse()==_connection._response);
892 }
893 }
894
895
896 public ContextHandler getContextHandler()
897 {
898 final AsyncEventState event=_event;
899 if (event!=null)
900 return ((Context)event.getServletContext()).getContextHandler();
901 return null;
902 }
903
904
905
906
907
908
909 public boolean isResumed()
910 {
911 synchronized (this)
912 {
913 return _resumed;
914 }
915 }
916
917
918
919
920 public boolean isExpired()
921 {
922 synchronized (this)
923 {
924 return _expired;
925 }
926 }
927
928
929
930
931
932 public void resume()
933 {
934 dispatch();
935 }
936
937
938
939
940 protected void suspend(final ServletContext context,
941 final ServletRequest request,
942 final ServletResponse response)
943 {
944 synchronized (this)
945 {
946 _responseWrapped=!(response instanceof Response);
947 doSuspend(context,request,response);
948 if (request instanceof HttpServletRequest)
949 {
950 _event._pathInContext = URIUtil.addPaths(((HttpServletRequest)request).getServletPath(),((HttpServletRequest)request).getPathInfo());
951 }
952 }
953 }
954
955
956
957
958
959
960 public void suspend(ServletResponse response)
961 {
962 _continuation=true;
963 _responseWrapped=!(response instanceof Response);
964 doSuspend(_connection.getRequest().getServletContext(),_connection.getRequest(),response);
965 }
966
967
968
969
970
971 public void suspend()
972 {
973 _responseWrapped=false;
974 _continuation=true;
975 doSuspend(_connection.getRequest().getServletContext(),_connection.getRequest(),_connection.getResponse());
976 }
977
978
979
980
981
982 public ServletResponse getServletResponse()
983 {
984 if (_responseWrapped && _event!=null && _event.getSuppliedResponse()!=null)
985 return _event.getSuppliedResponse();
986 return _connection.getResponse();
987 }
988
989
990
991
992
993 public Object getAttribute(String name)
994 {
995 return _connection.getRequest().getAttribute(name);
996 }
997
998
999
1000
1001
1002 public void removeAttribute(String name)
1003 {
1004 _connection.getRequest().removeAttribute(name);
1005 }
1006
1007
1008
1009
1010
1011 public void setAttribute(String name, Object attribute)
1012 {
1013 _connection.getRequest().setAttribute(name,attribute);
1014 }
1015
1016
1017
1018
1019
1020 public void undispatch()
1021 {
1022 if (isSuspended())
1023 {
1024 if (LOG.isDebugEnabled())
1025 throw new ContinuationThrowable();
1026 else
1027 throw __exception;
1028 }
1029 throw new IllegalStateException("!suspended");
1030 }
1031
1032
1033
1034 public class AsyncTimeout extends Timeout.Task implements Runnable
1035 {
1036 @Override
1037 public void expired()
1038 {
1039 AsyncContinuation.this.expired();
1040 }
1041
1042 @Override
1043 public void run()
1044 {
1045 AsyncContinuation.this.expired();
1046 }
1047 }
1048
1049
1050
1051 public class AsyncEventState extends AsyncEvent
1052 {
1053 private final ServletContext _suspendedContext;
1054 private ServletContext _dispatchContext;
1055 private String _pathInContext;
1056 private Timeout.Task _timeout= new AsyncTimeout();
1057
1058 public AsyncEventState(ServletContext context, ServletRequest request, ServletResponse response)
1059 {
1060 super(AsyncContinuation.this, request,response);
1061 _suspendedContext=context;
1062
1063 Request r=_connection.getRequest();
1064
1065
1066 if (r.getAttribute(AsyncContext.ASYNC_REQUEST_URI)==null)
1067 {
1068
1069
1070
1071
1072 String uri=(String)r.getAttribute(Dispatcher.FORWARD_REQUEST_URI);
1073 if (uri!=null)
1074 {
1075 r.setAttribute(AsyncContext.ASYNC_REQUEST_URI,uri);
1076 r.setAttribute(AsyncContext.ASYNC_CONTEXT_PATH,r.getAttribute(Dispatcher.FORWARD_CONTEXT_PATH));
1077 r.setAttribute(AsyncContext.ASYNC_SERVLET_PATH,r.getAttribute(Dispatcher.FORWARD_SERVLET_PATH));
1078 r.setAttribute(AsyncContext.ASYNC_PATH_INFO,r.getAttribute(Dispatcher.FORWARD_PATH_INFO));
1079 r.setAttribute(AsyncContext.ASYNC_QUERY_STRING,r.getAttribute(Dispatcher.FORWARD_QUERY_STRING));
1080 }
1081 else
1082 {
1083 r.setAttribute(AsyncContext.ASYNC_REQUEST_URI,r.getRequestURI());
1084 r.setAttribute(AsyncContext.ASYNC_CONTEXT_PATH,r.getContextPath());
1085 r.setAttribute(AsyncContext.ASYNC_SERVLET_PATH,r.getServletPath());
1086 r.setAttribute(AsyncContext.ASYNC_PATH_INFO,r.getPathInfo());
1087 r.setAttribute(AsyncContext.ASYNC_QUERY_STRING,r.getQueryString());
1088 }
1089 }
1090 }
1091
1092 public ServletContext getSuspendedContext()
1093 {
1094 return _suspendedContext;
1095 }
1096
1097 public ServletContext getDispatchContext()
1098 {
1099 return _dispatchContext;
1100 }
1101
1102 public ServletContext getServletContext()
1103 {
1104 return _dispatchContext==null?_suspendedContext:_dispatchContext;
1105 }
1106
1107
1108
1109
1110
1111 public String getPath()
1112 {
1113 return _pathInContext;
1114 }
1115 }
1116 }