1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.server;
15
16 import javax.servlet.AsyncContext;
17 import javax.servlet.AsyncEvent;
18 import javax.servlet.AsyncListener;
19 import javax.servlet.ServletResponseWrapper;
20 import javax.servlet.ServletException;
21
22 import java.util.ArrayList;
23 import java.util.List;
24
25 import javax.servlet.ServletContext;
26 import javax.servlet.ServletRequest;
27 import javax.servlet.ServletResponse;
28
29 import org.eclipse.jetty.continuation.Continuation;
30 import org.eclipse.jetty.continuation.ContinuationThrowable;
31 import org.eclipse.jetty.continuation.ContinuationListener;
32 import org.eclipse.jetty.io.AsyncEndPoint;
33 import org.eclipse.jetty.io.EndPoint;
34 import org.eclipse.jetty.server.handler.ContextHandler;
35 import org.eclipse.jetty.server.handler.ContextHandler.Context;
36 import org.eclipse.jetty.util.log.Log;
37 import org.eclipse.jetty.util.log.Logger;
38 import org.eclipse.jetty.util.thread.Timeout;
39
40
41
42
43
44 public class AsyncContinuation implements AsyncContext, Continuation
45 {
46 private static final Logger LOG = Log.getLogger(AsyncContinuation.class);
47
48 private final static long DEFAULT_TIMEOUT=30000L;
49
50 private final static ContinuationThrowable __exception = new ContinuationThrowable();
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65 private static final int __IDLE=0;
66 private static final int __DISPATCHED=1;
67 private static final int __ASYNCSTARTED=2;
68 private static final int __REDISPATCHING=3;
69 private static final int __ASYNCWAIT=4;
70 private static final int __REDISPATCH=5;
71 private static final int __REDISPATCHED=6;
72 private static final int __COMPLETING=7;
73 private static final int __UNCOMPLETED=8;
74 private static final int __COMPLETED=9;
75
76
77 protected AbstractHttpConnection _connection;
78 private List<AsyncListener> _lastAsyncListeners;
79 private List<AsyncListener> _asyncListeners;
80 private List<ContinuationListener> _continuationListeners;
81
82
83 private int _state;
84 private boolean _initial;
85 private boolean _resumed;
86 private boolean _expired;
87 private volatile boolean _responseWrapped;
88 private long _timeoutMs=DEFAULT_TIMEOUT;
89 private AsyncEventState _event;
90 private volatile long _expireAt;
91 private volatile boolean _continuation;
92
93
94 protected AsyncContinuation()
95 {
96 _state=__IDLE;
97 _initial=true;
98 }
99
100
101 protected void setConnection(final AbstractHttpConnection connection)
102 {
103 synchronized(this)
104 {
105 _connection=connection;
106 }
107 }
108
109
110 public void addListener(AsyncListener listener)
111 {
112 synchronized(this)
113 {
114 if (_asyncListeners==null)
115 _asyncListeners=new ArrayList<AsyncListener>();
116 _asyncListeners.add(listener);
117 }
118 }
119
120
121 public void addListener(AsyncListener listener,ServletRequest request, ServletResponse response)
122 {
123 synchronized(this)
124 {
125
126 if (_asyncListeners==null)
127 _asyncListeners=new ArrayList<AsyncListener>();
128 _asyncListeners.add(listener);
129 }
130 }
131
132
133 public void addContinuationListener(ContinuationListener listener)
134 {
135 synchronized(this)
136 {
137 if (_continuationListeners==null)
138 _continuationListeners=new ArrayList<ContinuationListener>();
139 _continuationListeners.add(listener);
140 }
141 }
142
143
144 public void setTimeout(long ms)
145 {
146 synchronized(this)
147 {
148 _timeoutMs=ms;
149 }
150 }
151
152
153 public long getTimeout()
154 {
155 synchronized(this)
156 {
157 return _timeoutMs;
158 }
159 }
160
161
162 public AsyncEventState getAsyncEventState()
163 {
164 synchronized(this)
165 {
166 return _event;
167 }
168 }
169
170
171
172
173
174
175
176
177
178
179 public boolean isResponseWrapped()
180 {
181 return _responseWrapped;
182 }
183
184
185
186
187
188 public boolean isInitial()
189 {
190 synchronized(this)
191 {
192 return _initial;
193 }
194 }
195
196
197
198
199
200 public boolean isSuspended()
201 {
202 synchronized(this)
203 {
204 switch(_state)
205 {
206 case __ASYNCSTARTED:
207 case __REDISPATCHING:
208 case __COMPLETING:
209 case __ASYNCWAIT:
210 return true;
211
212 default:
213 return false;
214 }
215 }
216 }
217
218
219 public boolean isSuspending()
220 {
221 synchronized(this)
222 {
223 switch(_state)
224 {
225 case __ASYNCSTARTED:
226 case __ASYNCWAIT:
227 return true;
228
229 default:
230 return false;
231 }
232 }
233 }
234
235
236 public boolean isDispatchable()
237 {
238 synchronized(this)
239 {
240 switch(_state)
241 {
242 case __REDISPATCH:
243 case __REDISPATCHED:
244 case __REDISPATCHING:
245 case __COMPLETING:
246 return true;
247
248 default:
249 return false;
250 }
251 }
252 }
253
254
255 @Override
256 public String toString()
257 {
258 synchronized (this)
259 {
260 return super.toString()+"@"+getStatusString();
261 }
262 }
263
264
265 public String getStatusString()
266 {
267 synchronized (this)
268 {
269 return
270 ((_state==__IDLE)?"IDLE":
271 (_state==__DISPATCHED)?"DISPATCHED":
272 (_state==__ASYNCSTARTED)?"ASYNCSTARTED":
273 (_state==__ASYNCWAIT)?"ASYNCWAIT":
274 (_state==__REDISPATCHING)?"REDISPATCHING":
275 (_state==__REDISPATCH)?"REDISPATCH":
276 (_state==__REDISPATCHED)?"REDISPATCHED":
277 (_state==__COMPLETING)?"COMPLETING":
278 (_state==__UNCOMPLETED)?"UNCOMPLETED":
279 (_state==__COMPLETED)?"COMPLETE":
280 ("UNKNOWN?"+_state))+
281 (_initial?",initial":"")+
282 (_resumed?",resumed":"")+
283 (_expired?",expired":"");
284 }
285 }
286
287
288
289
290
291 protected boolean handling()
292 {
293 synchronized (this)
294 {
295 _continuation=false;
296 _responseWrapped=false;
297
298 switch(_state)
299 {
300 case __IDLE:
301 _initial=true;
302 _state=__DISPATCHED;
303 if (_lastAsyncListeners!=null)
304 _lastAsyncListeners.clear();
305 if (_asyncListeners!=null)
306 _asyncListeners.clear();
307 else
308 {
309 _asyncListeners=_lastAsyncListeners;
310 _lastAsyncListeners=null;
311 }
312 return true;
313
314 case __COMPLETING:
315 _state=__UNCOMPLETED;
316 return false;
317
318 case __ASYNCWAIT:
319 return false;
320
321 case __REDISPATCH:
322 _state=__REDISPATCHED;
323 return true;
324
325 default:
326 throw new IllegalStateException(this.getStatusString());
327 }
328 }
329 }
330
331
332
333
334
335 protected void suspend(final ServletContext context,
336 final ServletRequest request,
337 final ServletResponse response)
338 {
339 synchronized (this)
340 {
341 switch(_state)
342 {
343 case __DISPATCHED:
344 case __REDISPATCHED:
345 _resumed=false;
346 _expired=false;
347
348 if (_event==null || request!=_event.getSuppliedRequest() || response != _event.getSuppliedResponse() || context != _event.getServletContext())
349 _event=new AsyncEventState(context,request,response);
350 else
351 {
352 _event._dispatchContext=null;
353 _event._path=null;
354 }
355
356 _state=__ASYNCSTARTED;
357 List<AsyncListener> recycle=_lastAsyncListeners;
358 _lastAsyncListeners=_asyncListeners;
359 _asyncListeners=recycle;
360 if (_asyncListeners!=null)
361 _asyncListeners.clear();
362 break;
363
364 default:
365 throw new IllegalStateException(this.getStatusString());
366 }
367 }
368
369 if (_lastAsyncListeners!=null)
370 {
371 for (AsyncListener listener : _lastAsyncListeners)
372 {
373 try
374 {
375 listener.onStartAsync(_event);
376 }
377 catch(Exception e)
378 {
379 LOG.warn(e);
380 }
381 }
382 }
383
384 }
385
386
387
388
389
390
391
392
393
394 protected boolean unhandle()
395 {
396 synchronized (this)
397 {
398 switch(_state)
399 {
400 case __REDISPATCHED:
401 case __DISPATCHED:
402 _state=__UNCOMPLETED;
403 return true;
404
405 case __IDLE:
406 throw new IllegalStateException(this.getStatusString());
407
408 case __ASYNCSTARTED:
409 _initial=false;
410 _state=__ASYNCWAIT;
411 scheduleTimeout();
412 if (_state==__ASYNCWAIT)
413 return true;
414 else if (_state==__COMPLETING)
415 {
416 _state=__UNCOMPLETED;
417 return true;
418 }
419 _initial=false;
420 _state=__REDISPATCHED;
421 return false;
422
423 case __REDISPATCHING:
424 _initial=false;
425 _state=__REDISPATCHED;
426 return false;
427
428 case __COMPLETING:
429 _initial=false;
430 _state=__UNCOMPLETED;
431 return true;
432
433 default:
434 throw new IllegalStateException(this.getStatusString());
435 }
436 }
437 }
438
439
440 public void dispatch()
441 {
442 boolean dispatch=false;
443 synchronized (this)
444 {
445 switch(_state)
446 {
447 case __ASYNCSTARTED:
448 _state=__REDISPATCHING;
449 _resumed=true;
450 return;
451
452 case __ASYNCWAIT:
453 dispatch=!_expired;
454 _state=__REDISPATCH;
455 _resumed=true;
456 break;
457
458 case __REDISPATCH:
459 return;
460
461 default:
462 throw new IllegalStateException(this.getStatusString());
463 }
464 }
465
466 if (dispatch)
467 {
468 cancelTimeout();
469 scheduleDispatch();
470 }
471 }
472
473
474 protected void expired()
475 {
476 final List<ContinuationListener> cListeners;
477 final List<AsyncListener> aListeners;
478 synchronized (this)
479 {
480 switch(_state)
481 {
482 case __ASYNCSTARTED:
483 case __ASYNCWAIT:
484 cListeners=_continuationListeners;
485 aListeners=_asyncListeners;
486 break;
487 default:
488 cListeners=null;
489 aListeners=null;
490 return;
491 }
492 _expired=true;
493 }
494
495 if (aListeners!=null)
496 {
497 for (AsyncListener listener : aListeners)
498 {
499 try
500 {
501 listener.onTimeout(_event);
502 }
503 catch(Exception e)
504 {
505 LOG.warn(e);
506 }
507 }
508 }
509 if (cListeners!=null)
510 {
511 for (ContinuationListener listener : cListeners)
512 {
513 try
514 {
515 listener.onTimeout(this);
516 }
517 catch(Exception e)
518 {
519 LOG.warn(e);
520 }
521 }
522 }
523
524
525
526 synchronized (this)
527 {
528 switch(_state)
529 {
530 case __ASYNCSTARTED:
531 case __ASYNCWAIT:
532 if (_continuation)
533 dispatch();
534 else
535
536 complete();
537 }
538 }
539
540 scheduleDispatch();
541 }
542
543
544
545
546
547 public void complete()
548 {
549
550 boolean dispatch=false;
551 synchronized (this)
552 {
553 switch(_state)
554 {
555 case __DISPATCHED:
556 case __REDISPATCHED:
557 throw new IllegalStateException(this.getStatusString());
558
559 case __ASYNCSTARTED:
560 _state=__COMPLETING;
561 return;
562
563 case __ASYNCWAIT:
564 _state=__COMPLETING;
565 dispatch=!_expired;
566 break;
567
568 default:
569 throw new IllegalStateException(this.getStatusString());
570 }
571 }
572
573 if (dispatch)
574 {
575 cancelTimeout();
576 scheduleDispatch();
577 }
578 }
579
580
581 @Override
582 public <T extends AsyncListener> T createListener(Class<T> clazz) throws ServletException
583 {
584 try
585 {
586
587 return clazz.newInstance();
588 }
589 catch(Exception e)
590 {
591 throw new ServletException(e);
592 }
593 }
594
595
596
597
598
599
600 protected void doComplete(Throwable ex)
601 {
602 final List<ContinuationListener> cListeners;
603 final List<AsyncListener> aListeners;
604 synchronized (this)
605 {
606 switch(_state)
607 {
608 case __UNCOMPLETED:
609 _state=__COMPLETED;
610 cListeners=_continuationListeners;
611 aListeners=_asyncListeners;
612 break;
613
614 default:
615 cListeners=null;
616 aListeners=null;
617 throw new IllegalStateException(this.getStatusString());
618 }
619 }
620
621 if (aListeners!=null)
622 {
623 for (AsyncListener listener : aListeners)
624 {
625 try
626 {
627 if (ex!=null)
628 {
629 _event.getSuppliedRequest().setAttribute(Dispatcher.ERROR_EXCEPTION,ex);
630 _event.getSuppliedRequest().setAttribute(Dispatcher.ERROR_MESSAGE,ex.getMessage());
631 listener.onError(_event);
632 }
633 else
634 listener.onComplete(_event);
635 }
636 catch(Exception e)
637 {
638 LOG.warn(e);
639 }
640 }
641 }
642 if (cListeners!=null)
643 {
644 for (ContinuationListener listener : cListeners)
645 {
646 try
647 {
648 listener.onComplete(this);
649 }
650 catch(Exception e)
651 {
652 LOG.warn(e);
653 }
654 }
655 }
656 }
657
658
659 protected void recycle()
660 {
661 synchronized (this)
662 {
663 switch(_state)
664 {
665 case __DISPATCHED:
666 case __REDISPATCHED:
667 throw new IllegalStateException(getStatusString());
668 default:
669 _state=__IDLE;
670 }
671 _initial = true;
672 _resumed=false;
673 _expired=false;
674 _responseWrapped=false;
675 cancelTimeout();
676 _timeoutMs=DEFAULT_TIMEOUT;
677 _continuationListeners=null;
678 }
679 }
680
681
682 public void cancel()
683 {
684 synchronized (this)
685 {
686 cancelTimeout();
687 _continuationListeners=null;
688 }
689 }
690
691
692 protected void scheduleDispatch()
693 {
694 EndPoint endp=_connection.getEndPoint();
695 if (!endp.isBlocking())
696 {
697 ((AsyncEndPoint)endp).asyncDispatch();
698 }
699 }
700
701
702 protected void scheduleTimeout()
703 {
704 EndPoint endp=_connection.getEndPoint();
705 if (_timeoutMs>0)
706 {
707 if (endp.isBlocking())
708 {
709 synchronized(this)
710 {
711 _expireAt = System.currentTimeMillis()+_timeoutMs;
712 long wait=_timeoutMs;
713 while (_expireAt>0 && wait>0 && _connection.getServer().isRunning())
714 {
715 try
716 {
717 this.wait(wait);
718 }
719 catch (InterruptedException e)
720 {
721 LOG.ignore(e);
722 }
723 wait=_expireAt-System.currentTimeMillis();
724 }
725
726 if (_expireAt>0 && wait<=0 && _connection.getServer().isRunning())
727 {
728 expired();
729 }
730 }
731 }
732 else
733 {
734 ((AsyncEndPoint)endp).scheduleTimeout(_event._timeout,_timeoutMs);
735 }
736 }
737 }
738
739
740 protected void cancelTimeout()
741 {
742 EndPoint endp=_connection.getEndPoint();
743 if (endp.isBlocking())
744 {
745 synchronized(this)
746 {
747 _expireAt=0;
748 this.notifyAll();
749 }
750 }
751 else
752 {
753 final AsyncEventState event=_event;
754 if (event!=null)
755 {
756 ((AsyncEndPoint)endp).cancelTimeout(event._timeout);
757 }
758 }
759 }
760
761
762 public boolean isCompleting()
763 {
764 synchronized (this)
765 {
766 return _state==__COMPLETING;
767 }
768 }
769
770
771 boolean isUncompleted()
772 {
773 synchronized (this)
774 {
775 return _state==__UNCOMPLETED;
776 }
777 }
778
779
780 public boolean isComplete()
781 {
782 synchronized (this)
783 {
784 return _state==__COMPLETED;
785 }
786 }
787
788
789
790 public boolean isAsyncStarted()
791 {
792 synchronized (this)
793 {
794 switch(_state)
795 {
796 case __ASYNCSTARTED:
797 case __REDISPATCHING:
798 case __REDISPATCH:
799 case __ASYNCWAIT:
800 return true;
801
802 default:
803 return false;
804 }
805 }
806 }
807
808
809
810 public boolean isAsync()
811 {
812 synchronized (this)
813 {
814 switch(_state)
815 {
816 case __IDLE:
817 case __DISPATCHED:
818 case __UNCOMPLETED:
819 case __COMPLETED:
820 return false;
821
822 default:
823 return true;
824 }
825 }
826 }
827
828
829 public void dispatch(ServletContext context, String path)
830 {
831 _event._dispatchContext=context;
832 _event._path=path;
833 dispatch();
834 }
835
836
837 public void dispatch(String path)
838 {
839 _event._path=path;
840 dispatch();
841 }
842
843
844 public Request getBaseRequest()
845 {
846 return _connection.getRequest();
847 }
848
849
850 public ServletRequest getRequest()
851 {
852 if (_event!=null)
853 return _event.getSuppliedRequest();
854 return _connection.getRequest();
855 }
856
857
858 public ServletResponse getResponse()
859 {
860 if (_responseWrapped && _event!=null && _event.getSuppliedResponse()!=null)
861 return _event.getSuppliedResponse();
862 return _connection.getResponse();
863 }
864
865
866 public void start(final Runnable run)
867 {
868 final AsyncEventState event=_event;
869 if (event!=null)
870 {
871 _connection.getServer().getThreadPool().dispatch(new Runnable()
872 {
873 public void run()
874 {
875 ((Context)event.getServletContext()).getContextHandler().handle(run);
876 }
877 });
878 }
879 }
880
881
882 public boolean hasOriginalRequestAndResponse()
883 {
884 synchronized (this)
885 {
886 return (_event!=null && _event.getSuppliedRequest()==_connection._request && _event.getSuppliedResponse()==_connection._response);
887 }
888 }
889
890
891 public ContextHandler getContextHandler()
892 {
893 final AsyncEventState event=_event;
894 if (event!=null)
895 return ((Context)event.getServletContext()).getContextHandler();
896 return null;
897 }
898
899
900
901
902
903
904 public boolean isResumed()
905 {
906 synchronized (this)
907 {
908 return _resumed;
909 }
910 }
911
912
913
914
915 public boolean isExpired()
916 {
917 synchronized (this)
918 {
919 return _expired;
920 }
921 }
922
923
924
925
926
927 public void resume()
928 {
929 dispatch();
930 }
931
932
933
934
935
936 public void suspend(ServletResponse response)
937 {
938 _continuation=true;
939 if (response instanceof ServletResponseWrapper)
940 {
941 _responseWrapped=true;
942 AsyncContinuation.this.suspend(_connection.getRequest().getServletContext(),_connection.getRequest(),response);
943 }
944 else
945 {
946 _responseWrapped=false;
947 AsyncContinuation.this.suspend(_connection.getRequest().getServletContext(),_connection.getRequest(),_connection.getResponse());
948 }
949 }
950
951
952
953
954
955 public void suspend()
956 {
957 _responseWrapped=false;
958 _continuation=true;
959 AsyncContinuation.this.suspend(_connection.getRequest().getServletContext(),_connection.getRequest(),_connection.getResponse());
960 }
961
962
963
964
965
966 public ServletResponse getServletResponse()
967 {
968 if (_responseWrapped && _event!=null && _event.getSuppliedResponse()!=null)
969 return _event.getSuppliedResponse();
970 return _connection.getResponse();
971 }
972
973
974
975
976
977 public Object getAttribute(String name)
978 {
979 return _connection.getRequest().getAttribute(name);
980 }
981
982
983
984
985
986 public void removeAttribute(String name)
987 {
988 _connection.getRequest().removeAttribute(name);
989 }
990
991
992
993
994
995 public void setAttribute(String name, Object attribute)
996 {
997 _connection.getRequest().setAttribute(name,attribute);
998 }
999
1000
1001
1002
1003
1004 public void undispatch()
1005 {
1006 if (isSuspended())
1007 {
1008 if (LOG.isDebugEnabled())
1009 throw new ContinuationThrowable();
1010 else
1011 throw __exception;
1012 }
1013 throw new IllegalStateException("!suspended");
1014 }
1015
1016
1017
1018 public class AsyncTimeout extends Timeout.Task implements Runnable
1019 {
1020 @Override
1021 public void expired()
1022 {
1023 AsyncContinuation.this.expired();
1024 }
1025
1026 @Override
1027 public void run()
1028 {
1029 AsyncContinuation.this.expired();
1030 }
1031 }
1032
1033
1034
1035 public class AsyncEventState extends AsyncEvent
1036 {
1037 private final ServletContext _suspendedContext;
1038 private ServletContext _dispatchContext;
1039 private String _path;
1040 private Timeout.Task _timeout= new AsyncTimeout();
1041
1042 public AsyncEventState(ServletContext context, ServletRequest request, ServletResponse response)
1043 {
1044 super(AsyncContinuation.this, request,response);
1045 _suspendedContext=context;
1046 }
1047
1048 public ServletContext getSuspendedContext()
1049 {
1050 return _suspendedContext;
1051 }
1052
1053 public ServletContext getDispatchContext()
1054 {
1055 return _dispatchContext;
1056 }
1057
1058 public ServletContext getServletContext()
1059 {
1060 return _dispatchContext==null?_suspendedContext:_dispatchContext;
1061 }
1062
1063 public String getPath()
1064 {
1065 return _path;
1066 }
1067 }
1068 }