1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.server;
15
16 import javax.servlet.AsyncContext;
17 import javax.servlet.AsyncEvent;
18 import javax.servlet.AsyncListener;
19 import javax.servlet.ServletResponseWrapper;
20 import javax.servlet.ServletException;
21
22 import java.util.ArrayList;
23 import java.util.List;
24
25 import javax.servlet.ServletContext;
26 import javax.servlet.ServletRequest;
27 import javax.servlet.ServletResponse;
28
29 import org.eclipse.jetty.continuation.Continuation;
30 import org.eclipse.jetty.continuation.ContinuationThrowable;
31 import org.eclipse.jetty.continuation.ContinuationListener;
32 import org.eclipse.jetty.io.AsyncEndPoint;
33 import org.eclipse.jetty.io.EndPoint;
34 import org.eclipse.jetty.server.handler.ContextHandler;
35 import org.eclipse.jetty.server.handler.ContextHandler.Context;
36 import org.eclipse.jetty.util.log.Log;
37 import org.eclipse.jetty.util.log.Logger;
38 import org.eclipse.jetty.util.thread.Timeout;
39
40
41
42
43
44 public class AsyncContinuation implements AsyncContext, Continuation
45 {
46 private static final Logger LOG = Log.getLogger(AsyncContinuation.class);
47
48 private final static long DEFAULT_TIMEOUT=30000L;
49
50 private final static ContinuationThrowable __exception = new ContinuationThrowable();
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65 private static final int __IDLE=0;
66 private static final int __DISPATCHED=1;
67 private static final int __ASYNCSTARTED=2;
68 private static final int __REDISPATCHING=3;
69 private static final int __ASYNCWAIT=4;
70 private static final int __REDISPATCH=5;
71 private static final int __REDISPATCHED=6;
72 private static final int __COMPLETING=7;
73 private static final int __UNCOMPLETED=8;
74 private static final int __COMPLETED=9;
75
76
77 protected HttpConnection _connection;
78 private List<AsyncListener> _lastAsyncListeners;
79 private List<AsyncListener> _asyncListeners;
80 private List<ContinuationListener> _continuationListeners;
81
82
83 private int _state;
84 private boolean _initial;
85 private boolean _resumed;
86 private boolean _expired;
87 private volatile boolean _responseWrapped;
88 private long _timeoutMs=DEFAULT_TIMEOUT;
89 private AsyncEventState _event;
90 private volatile long _expireAt;
91 private volatile boolean _continuation;
92
93
94 protected AsyncContinuation()
95 {
96 _state=__IDLE;
97 _initial=true;
98 }
99
100
101 protected void setConnection(final HttpConnection connection)
102 {
103 synchronized(this)
104 {
105 _connection=connection;
106 }
107 }
108
109
110 public void addListener(AsyncListener listener)
111 {
112 synchronized(this)
113 {
114 if (_asyncListeners==null)
115 _asyncListeners=new ArrayList<AsyncListener>();
116 _asyncListeners.add(listener);
117 }
118 }
119
120
121 public void addListener(AsyncListener listener,ServletRequest request, ServletResponse response)
122 {
123 synchronized(this)
124 {
125
126 if (_asyncListeners==null)
127 _asyncListeners=new ArrayList<AsyncListener>();
128 _asyncListeners.add(listener);
129 }
130 }
131
132
133 public void addContinuationListener(ContinuationListener listener)
134 {
135 synchronized(this)
136 {
137 if (_continuationListeners==null)
138 _continuationListeners=new ArrayList<ContinuationListener>();
139 _continuationListeners.add(listener);
140 }
141 }
142
143
144 public void setTimeout(long ms)
145 {
146 synchronized(this)
147 {
148 _timeoutMs=ms;
149 }
150 }
151
152
153 public long getTimeout()
154 {
155 synchronized(this)
156 {
157 return _timeoutMs;
158 }
159 }
160
161
162 public AsyncEventState getAsyncEventState()
163 {
164 synchronized(this)
165 {
166 return _event;
167 }
168 }
169
170
171
172
173
174
175
176
177
178
179 public boolean isResponseWrapped()
180 {
181 return _responseWrapped;
182 }
183
184
185
186
187
188 public boolean isInitial()
189 {
190 synchronized(this)
191 {
192 return _initial;
193 }
194 }
195
196
197
198
199
200 public boolean isSuspended()
201 {
202 synchronized(this)
203 {
204 switch(_state)
205 {
206 case __ASYNCSTARTED:
207 case __REDISPATCHING:
208 case __COMPLETING:
209 case __ASYNCWAIT:
210 return true;
211
212 default:
213 return false;
214 }
215 }
216 }
217
218
219 @Override
220 public String toString()
221 {
222 synchronized (this)
223 {
224 return super.toString()+"@"+getStatusString();
225 }
226 }
227
228
229 public String getStatusString()
230 {
231 synchronized (this)
232 {
233 return
234 ((_state==__IDLE)?"IDLE":
235 (_state==__DISPATCHED)?"DISPATCHED":
236 (_state==__ASYNCSTARTED)?"ASYNCSTARTED":
237 (_state==__ASYNCWAIT)?"ASYNCWAIT":
238 (_state==__REDISPATCHING)?"REDISPATCHING":
239 (_state==__REDISPATCH)?"REDISPATCH":
240 (_state==__REDISPATCHED)?"REDISPATCHED":
241 (_state==__COMPLETING)?"COMPLETING":
242 (_state==__UNCOMPLETED)?"UNCOMPLETED":
243 (_state==__COMPLETED)?"COMPLETE":
244 ("UNKNOWN?"+_state))+
245 (_initial?",initial":"")+
246 (_resumed?",resumed":"")+
247 (_expired?",expired":"");
248 }
249 }
250
251
252
253
254
255 protected boolean handling()
256 {
257 synchronized (this)
258 {
259 _continuation=false;
260 _responseWrapped=false;
261
262 switch(_state)
263 {
264 case __IDLE:
265 _initial=true;
266 _state=__DISPATCHED;
267 if (_lastAsyncListeners!=null)
268 _lastAsyncListeners.clear();
269 if (_asyncListeners!=null)
270 _asyncListeners.clear();
271 else
272 {
273 _asyncListeners=_lastAsyncListeners;
274 _lastAsyncListeners=null;
275 }
276 return true;
277
278 case __COMPLETING:
279 _state=__UNCOMPLETED;
280 return false;
281
282 case __ASYNCWAIT:
283 return false;
284
285 case __REDISPATCH:
286 _state=__REDISPATCHED;
287 return true;
288
289 default:
290 throw new IllegalStateException(this.getStatusString());
291 }
292 }
293 }
294
295
296
297
298
299 protected void suspend(final ServletContext context,
300 final ServletRequest request,
301 final ServletResponse response)
302 {
303 synchronized (this)
304 {
305 switch(_state)
306 {
307 case __DISPATCHED:
308 case __REDISPATCHED:
309 _resumed=false;
310 _expired=false;
311
312 if (_event==null || request!=_event.getSuppliedRequest() || response != _event.getSuppliedResponse() || context != _event.getServletContext())
313 _event=new AsyncEventState(context,request,response);
314 else
315 {
316 _event._dispatchContext=null;
317 _event._path=null;
318 }
319
320 _state=__ASYNCSTARTED;
321 List<AsyncListener> recycle=_lastAsyncListeners;
322 _lastAsyncListeners=_asyncListeners;
323 _asyncListeners=recycle;
324 if (_asyncListeners!=null)
325 _asyncListeners.clear();
326 break;
327
328 default:
329 throw new IllegalStateException(this.getStatusString());
330 }
331 }
332
333 if (_lastAsyncListeners!=null)
334 {
335 for (AsyncListener listener : _lastAsyncListeners)
336 {
337 try
338 {
339 listener.onStartAsync(_event);
340 }
341 catch(Exception e)
342 {
343 LOG.warn(e);
344 }
345 }
346 }
347
348 }
349
350
351
352
353
354
355
356
357
358 protected boolean unhandle()
359 {
360 synchronized (this)
361 {
362 switch(_state)
363 {
364 case __REDISPATCHED:
365 case __DISPATCHED:
366 _state=__UNCOMPLETED;
367 return true;
368
369 case __IDLE:
370 throw new IllegalStateException(this.getStatusString());
371
372 case __ASYNCSTARTED:
373 _initial=false;
374 _state=__ASYNCWAIT;
375 scheduleTimeout();
376 if (_state==__ASYNCWAIT)
377 return true;
378 else if (_state==__COMPLETING)
379 {
380 _state=__UNCOMPLETED;
381 return true;
382 }
383
384 _initial=false;
385 _state=__REDISPATCHED;
386 return false;
387
388 case __REDISPATCHING:
389 _initial=false;
390 _state=__REDISPATCHED;
391 return false;
392
393 case __COMPLETING:
394 _initial=false;
395 _state=__UNCOMPLETED;
396 return true;
397
398 default:
399 throw new IllegalStateException(this.getStatusString());
400 }
401 }
402 }
403
404
405 public void dispatch()
406 {
407 boolean dispatch=false;
408 synchronized (this)
409 {
410 switch(_state)
411 {
412 case __ASYNCSTARTED:
413 _state=__REDISPATCHING;
414 _resumed=true;
415 return;
416
417 case __ASYNCWAIT:
418 dispatch=!_expired;
419 _state=__REDISPATCH;
420 _resumed=true;
421 break;
422
423 case __REDISPATCH:
424 return;
425
426 default:
427 throw new IllegalStateException(this.getStatusString());
428 }
429 }
430
431 if (dispatch)
432 {
433 cancelTimeout();
434 scheduleDispatch();
435 }
436 }
437
438
439 protected void expired()
440 {
441 final List<ContinuationListener> cListeners;
442 final List<AsyncListener> aListeners;
443 synchronized (this)
444 {
445 switch(_state)
446 {
447 case __ASYNCSTARTED:
448 case __ASYNCWAIT:
449 cListeners=_continuationListeners;
450 aListeners=_asyncListeners;
451 break;
452 default:
453 cListeners=null;
454 aListeners=null;
455 return;
456 }
457 _expired=true;
458 }
459
460 if (aListeners!=null)
461 {
462 for (AsyncListener listener : aListeners)
463 {
464 try
465 {
466 listener.onTimeout(_event);
467 }
468 catch(Exception e)
469 {
470 LOG.warn(e);
471 }
472 }
473 }
474 if (cListeners!=null)
475 {
476 for (ContinuationListener listener : cListeners)
477 {
478 try
479 {
480 listener.onTimeout(this);
481 }
482 catch(Exception e)
483 {
484 LOG.warn(e);
485 }
486 }
487 }
488
489
490
491 synchronized (this)
492 {
493 switch(_state)
494 {
495 case __ASYNCSTARTED:
496 case __ASYNCWAIT:
497 if (_continuation)
498 dispatch();
499 else
500
501 complete();
502 }
503 }
504
505 scheduleDispatch();
506 }
507
508
509
510
511
512 public void complete()
513 {
514
515 boolean dispatch=false;
516 synchronized (this)
517 {
518 switch(_state)
519 {
520 case __DISPATCHED:
521 case __REDISPATCHED:
522 throw new IllegalStateException(this.getStatusString());
523
524 case __ASYNCSTARTED:
525 _state=__COMPLETING;
526 return;
527
528 case __ASYNCWAIT:
529 _state=__COMPLETING;
530 dispatch=!_expired;
531 break;
532
533 default:
534 throw new IllegalStateException(this.getStatusString());
535 }
536 }
537
538 if (dispatch)
539 {
540 cancelTimeout();
541 scheduleDispatch();
542 }
543 }
544
545
546 @Override
547 public <T extends AsyncListener> T createListener(Class<T> clazz) throws ServletException
548 {
549 try
550 {
551
552 return clazz.newInstance();
553 }
554 catch(Exception e)
555 {
556 throw new ServletException(e);
557 }
558 }
559
560
561
562
563
564
565 protected void doComplete(Throwable ex)
566 {
567 final List<ContinuationListener> cListeners;
568 final List<AsyncListener> aListeners;
569 synchronized (this)
570 {
571 switch(_state)
572 {
573 case __UNCOMPLETED:
574 _state=__COMPLETED;
575 cListeners=_continuationListeners;
576 aListeners=_asyncListeners;
577 break;
578
579 default:
580 cListeners=null;
581 aListeners=null;
582 throw new IllegalStateException(this.getStatusString());
583 }
584 }
585
586 if (aListeners!=null)
587 {
588 for (AsyncListener listener : aListeners)
589 {
590 try
591 {
592 if (ex!=null)
593 {
594 _event.getSuppliedRequest().setAttribute(Dispatcher.ERROR_EXCEPTION,ex);
595 _event.getSuppliedRequest().setAttribute(Dispatcher.ERROR_MESSAGE,ex.getMessage());
596 listener.onError(_event);
597 }
598 else
599 listener.onComplete(_event);
600 }
601 catch(Exception e)
602 {
603 LOG.warn(e);
604 }
605 }
606 }
607 if (cListeners!=null)
608 {
609 for (ContinuationListener listener : cListeners)
610 {
611 try
612 {
613 listener.onComplete(this);
614 }
615 catch(Exception e)
616 {
617 LOG.warn(e);
618 }
619 }
620 }
621 }
622
623
624 protected void recycle()
625 {
626 synchronized (this)
627 {
628 switch(_state)
629 {
630 case __DISPATCHED:
631 case __REDISPATCHED:
632 throw new IllegalStateException(getStatusString());
633 default:
634 _state=__IDLE;
635 }
636 _initial = true;
637 _resumed=false;
638 _expired=false;
639 _responseWrapped=false;
640 cancelTimeout();
641 _timeoutMs=DEFAULT_TIMEOUT;
642 _continuationListeners=null;
643 }
644 }
645
646
647 public void cancel()
648 {
649 synchronized (this)
650 {
651 cancelTimeout();
652 _continuationListeners=null;
653 }
654 }
655
656
657 protected void scheduleDispatch()
658 {
659 EndPoint endp=_connection.getEndPoint();
660 if (!endp.isBlocking())
661 {
662 ((AsyncEndPoint)endp).dispatch();
663 }
664 }
665
666
667 protected void scheduleTimeout()
668 {
669 EndPoint endp=_connection.getEndPoint();
670 if (_timeoutMs>0)
671 {
672 if (endp.isBlocking())
673 {
674 synchronized(this)
675 {
676 _expireAt = System.currentTimeMillis()+_timeoutMs;
677 long wait=_timeoutMs;
678 while (_expireAt>0 && wait>0 && _connection.getServer().isRunning())
679 {
680 try
681 {
682 this.wait(wait);
683 }
684 catch (InterruptedException e)
685 {
686 LOG.ignore(e);
687 }
688 wait=_expireAt-System.currentTimeMillis();
689 }
690
691 if (_expireAt>0 && wait<=0 && _connection.getServer().isRunning())
692 {
693 expired();
694 }
695 }
696 }
697 else
698 {
699 _connection.scheduleTimeout(_event._timeout,_timeoutMs);
700 }
701 }
702 }
703
704
705 protected void cancelTimeout()
706 {
707 EndPoint endp=_connection.getEndPoint();
708 if (endp.isBlocking())
709 {
710 synchronized(this)
711 {
712 _expireAt=0;
713 this.notifyAll();
714 }
715 }
716 else
717 {
718 final AsyncEventState event=_event;
719 if (event!=null)
720 _connection.cancelTimeout(event._timeout);
721 }
722 }
723
724
725 public boolean isCompleting()
726 {
727 synchronized (this)
728 {
729 return _state==__COMPLETING;
730 }
731 }
732
733
734 boolean isUncompleted()
735 {
736 synchronized (this)
737 {
738 return _state==__UNCOMPLETED;
739 }
740 }
741
742
743 public boolean isComplete()
744 {
745 synchronized (this)
746 {
747 return _state==__COMPLETED;
748 }
749 }
750
751
752
753 public boolean isAsyncStarted()
754 {
755 synchronized (this)
756 {
757 switch(_state)
758 {
759 case __ASYNCSTARTED:
760 case __REDISPATCHING:
761 case __REDISPATCH:
762 case __ASYNCWAIT:
763 return true;
764
765 default:
766 return false;
767 }
768 }
769 }
770
771
772
773 public boolean isAsync()
774 {
775 synchronized (this)
776 {
777 switch(_state)
778 {
779 case __IDLE:
780 case __DISPATCHED:
781 case __UNCOMPLETED:
782 case __COMPLETED:
783 return false;
784
785 default:
786 return true;
787 }
788 }
789 }
790
791
792 public void dispatch(ServletContext context, String path)
793 {
794 _event._dispatchContext=context;
795 _event._path=path;
796 dispatch();
797 }
798
799
800 public void dispatch(String path)
801 {
802 _event._path=path;
803 dispatch();
804 }
805
806
807 public Request getBaseRequest()
808 {
809 return _connection.getRequest();
810 }
811
812
813 public ServletRequest getRequest()
814 {
815 if (_event!=null)
816 return _event.getSuppliedRequest();
817 return _connection.getRequest();
818 }
819
820
821 public ServletResponse getResponse()
822 {
823 if (_responseWrapped && _event!=null && _event.getSuppliedResponse()!=null)
824 return _event.getSuppliedResponse();
825 return _connection.getResponse();
826 }
827
828
829 public void start(final Runnable run)
830 {
831 final AsyncEventState event=_event;
832 if (event!=null)
833 {
834 _connection.getServer().getThreadPool().dispatch(new Runnable()
835 {
836 public void run()
837 {
838 ((Context)event.getServletContext()).getContextHandler().handle(run);
839 }
840 });
841 }
842 }
843
844
845 public boolean hasOriginalRequestAndResponse()
846 {
847 synchronized (this)
848 {
849 return (_event!=null && _event.getSuppliedRequest()==_connection._request && _event.getSuppliedResponse()==_connection._response);
850 }
851 }
852
853
854 public ContextHandler getContextHandler()
855 {
856 final AsyncEventState event=_event;
857 if (event!=null)
858 return ((Context)event.getServletContext()).getContextHandler();
859 return null;
860 }
861
862
863
864
865
866
867 public boolean isResumed()
868 {
869 synchronized (this)
870 {
871 return _resumed;
872 }
873 }
874
875
876
877
878 public boolean isExpired()
879 {
880 synchronized (this)
881 {
882 return _expired;
883 }
884 }
885
886
887
888
889
890 public void resume()
891 {
892 dispatch();
893 }
894
895
896
897
898
899 public void suspend(ServletResponse response)
900 {
901 _continuation=true;
902 if (response instanceof ServletResponseWrapper)
903 {
904 _responseWrapped=true;
905 AsyncContinuation.this.suspend(_connection.getRequest().getServletContext(),_connection.getRequest(),response);
906 }
907 else
908 {
909 _responseWrapped=false;
910 AsyncContinuation.this.suspend(_connection.getRequest().getServletContext(),_connection.getRequest(),_connection.getResponse());
911 }
912 }
913
914
915
916
917
918 public void suspend()
919 {
920 _responseWrapped=false;
921 _continuation=true;
922 AsyncContinuation.this.suspend(_connection.getRequest().getServletContext(),_connection.getRequest(),_connection.getResponse());
923 }
924
925
926
927
928
929 public ServletResponse getServletResponse()
930 {
931 if (_responseWrapped && _event!=null && _event.getSuppliedResponse()!=null)
932 return _event.getSuppliedResponse();
933 return _connection.getResponse();
934 }
935
936
937
938
939
940 public Object getAttribute(String name)
941 {
942 return _connection.getRequest().getAttribute(name);
943 }
944
945
946
947
948
949 public void removeAttribute(String name)
950 {
951 _connection.getRequest().removeAttribute(name);
952 }
953
954
955
956
957
958 public void setAttribute(String name, Object attribute)
959 {
960 _connection.getRequest().setAttribute(name,attribute);
961 }
962
963
964
965
966
967 public void undispatch()
968 {
969 if (isSuspended())
970 {
971 if (LOG.isDebugEnabled())
972 throw new ContinuationThrowable();
973 else
974 throw __exception;
975 }
976 throw new IllegalStateException("!suspended");
977 }
978
979
980
981 public class AsyncTimeout extends Timeout.Task implements Runnable
982 {
983 @Override
984 public void expired()
985 {
986 AsyncContinuation.this.expired();
987 }
988
989 @Override
990 public void run()
991 {
992 AsyncContinuation.this.expired();
993 }
994 }
995
996
997
998 public class AsyncEventState extends AsyncEvent
999 {
1000 private final ServletContext _suspendedContext;
1001 private ServletContext _dispatchContext;
1002 private String _path;
1003 private Timeout.Task _timeout= new AsyncTimeout();
1004
1005 public AsyncEventState(ServletContext context, ServletRequest request, ServletResponse response)
1006 {
1007 super(AsyncContinuation.this, request,response);
1008 _suspendedContext=context;
1009 }
1010
1011 public ServletContext getSuspendedContext()
1012 {
1013 return _suspendedContext;
1014 }
1015
1016 public ServletContext getDispatchContext()
1017 {
1018 return _dispatchContext;
1019 }
1020
1021 public ServletContext getServletContext()
1022 {
1023 return _dispatchContext==null?_suspendedContext:_dispatchContext;
1024 }
1025
1026 public String getPath()
1027 {
1028 return _path;
1029 }
1030 }
1031 }