1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.server;
15
16 import javax.servlet.AsyncContext;
17 import javax.servlet.AsyncEvent;
18 import javax.servlet.AsyncListener;
19 import javax.servlet.ServletResponseWrapper;
20 import javax.servlet.ServletException;
21
22 import java.util.ArrayList;
23 import java.util.List;
24
25 import javax.servlet.ServletContext;
26 import javax.servlet.ServletRequest;
27 import javax.servlet.ServletResponse;
28
29 import org.eclipse.jetty.continuation.Continuation;
30 import org.eclipse.jetty.continuation.ContinuationThrowable;
31 import org.eclipse.jetty.continuation.ContinuationListener;
32 import org.eclipse.jetty.io.AsyncEndPoint;
33 import org.eclipse.jetty.io.EndPoint;
34 import org.eclipse.jetty.server.handler.ContextHandler;
35 import org.eclipse.jetty.server.handler.ContextHandler.Context;
36 import org.eclipse.jetty.util.log.Log;
37 import org.eclipse.jetty.util.thread.Timeout;
38
39
40
41
42
43 public class AsyncContinuation implements AsyncContext, Continuation
44 {
45 private final static long DEFAULT_TIMEOUT=30000L;
46
47 private final static ContinuationThrowable __exception = new ContinuationThrowable();
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62 private static final int __IDLE=0;
63 private static final int __DISPATCHED=1;
64 private static final int __ASYNCSTARTED=2;
65 private static final int __REDISPATCHING=3;
66 private static final int __ASYNCWAIT=4;
67 private static final int __REDISPATCH=5;
68 private static final int __REDISPATCHED=6;
69 private static final int __COMPLETING=7;
70 private static final int __UNCOMPLETED=8;
71 private static final int __COMPLETED=9;
72
73
74 protected HttpConnection _connection;
75 private List<AsyncListener> _lastAsyncListeners;
76 private List<AsyncListener> _asyncListeners;
77 private List<ContinuationListener> _continuationListeners;
78
79
80 private int _state;
81 private boolean _initial;
82 private boolean _resumed;
83 private boolean _expired;
84 private volatile boolean _responseWrapped;
85 private long _timeoutMs=DEFAULT_TIMEOUT;
86 private AsyncEventState _event;
87 private volatile long _expireAt;
88 private volatile boolean _continuation;
89
90
91 protected AsyncContinuation()
92 {
93 _state=__IDLE;
94 _initial=true;
95 }
96
97
98 protected void setConnection(final HttpConnection connection)
99 {
100 synchronized(this)
101 {
102 _connection=connection;
103 }
104 }
105
106
107 public void addListener(AsyncListener listener)
108 {
109 synchronized(this)
110 {
111 if (_asyncListeners==null)
112 _asyncListeners=new ArrayList<AsyncListener>();
113 _asyncListeners.add(listener);
114 }
115 }
116
117
118 public void addListener(AsyncListener listener,ServletRequest request, ServletResponse response)
119 {
120 synchronized(this)
121 {
122
123 if (_asyncListeners==null)
124 _asyncListeners=new ArrayList<AsyncListener>();
125 _asyncListeners.add(listener);
126 }
127 }
128
129
130 public void addContinuationListener(ContinuationListener listener)
131 {
132 synchronized(this)
133 {
134 if (_continuationListeners==null)
135 _continuationListeners=new ArrayList<ContinuationListener>();
136 _continuationListeners.add(listener);
137 }
138 }
139
140
141 public void setTimeout(long ms)
142 {
143 synchronized(this)
144 {
145 _timeoutMs=ms;
146 }
147 }
148
149
150 public long getTimeout()
151 {
152 synchronized(this)
153 {
154 return _timeoutMs;
155 }
156 }
157
158
159 public AsyncEventState getAsyncEventState()
160 {
161 synchronized(this)
162 {
163 return _event;
164 }
165 }
166
167
168
169
170
171
172
173
174
175
176 public boolean isResponseWrapped()
177 {
178 return _responseWrapped;
179 }
180
181
182
183
184
185 public boolean isInitial()
186 {
187 synchronized(this)
188 {
189 return _initial;
190 }
191 }
192
193
194
195
196
197 public boolean isSuspended()
198 {
199 synchronized(this)
200 {
201 switch(_state)
202 {
203 case __ASYNCSTARTED:
204 case __REDISPATCHING:
205 case __COMPLETING:
206 case __ASYNCWAIT:
207 return true;
208
209 default:
210 return false;
211 }
212 }
213 }
214
215
216 @Override
217 public String toString()
218 {
219 synchronized (this)
220 {
221 return super.toString()+"@"+getStatusString();
222 }
223 }
224
225
226 public String getStatusString()
227 {
228 synchronized (this)
229 {
230 return
231 ((_state==__IDLE)?"IDLE":
232 (_state==__DISPATCHED)?"DISPATCHED":
233 (_state==__ASYNCSTARTED)?"ASYNCSTARTED":
234 (_state==__ASYNCWAIT)?"ASYNCWAIT":
235 (_state==__REDISPATCHING)?"REDISPATCHING":
236 (_state==__REDISPATCH)?"REDISPATCH":
237 (_state==__REDISPATCHED)?"REDISPATCHED":
238 (_state==__COMPLETING)?"COMPLETING":
239 (_state==__UNCOMPLETED)?"UNCOMPLETED":
240 (_state==__COMPLETED)?"COMPLETE":
241 ("UNKNOWN?"+_state))+
242 (_initial?",initial":"")+
243 (_resumed?",resumed":"")+
244 (_expired?",expired":"");
245 }
246 }
247
248
249
250
251
252 protected boolean handling()
253 {
254 synchronized (this)
255 {
256 _continuation=false;
257 _responseWrapped=false;
258
259 switch(_state)
260 {
261 case __IDLE:
262 _initial=true;
263 _state=__DISPATCHED;
264 if (_lastAsyncListeners!=null)
265 _lastAsyncListeners.clear();
266 if (_asyncListeners!=null)
267 _asyncListeners.clear();
268 else
269 {
270 _asyncListeners=_lastAsyncListeners;
271 _lastAsyncListeners=null;
272 }
273 return true;
274
275 case __COMPLETING:
276 _state=__UNCOMPLETED;
277 return false;
278
279 case __ASYNCWAIT:
280 return false;
281
282 case __REDISPATCH:
283 _state=__REDISPATCHED;
284 return true;
285
286 default:
287 throw new IllegalStateException(this.getStatusString());
288 }
289 }
290 }
291
292
293
294
295
296 protected void suspend(final ServletContext context,
297 final ServletRequest request,
298 final ServletResponse response)
299 {
300 synchronized (this)
301 {
302 switch(_state)
303 {
304 case __DISPATCHED:
305 case __REDISPATCHED:
306 _resumed=false;
307 _expired=false;
308
309 if (_event==null || request!=_event.getSuppliedRequest() || response != _event.getSuppliedResponse() || context != _event.getServletContext())
310 _event=new AsyncEventState(context,request,response);
311 else
312 {
313 _event._dispatchContext=null;
314 _event._path=null;
315 }
316
317 _state=__ASYNCSTARTED;
318 List<AsyncListener> recycle=_lastAsyncListeners;
319 _lastAsyncListeners=_asyncListeners;
320 _asyncListeners=recycle;
321 if (_asyncListeners!=null)
322 _asyncListeners.clear();
323 break;
324
325 default:
326 throw new IllegalStateException(this.getStatusString());
327 }
328 }
329
330 if (_lastAsyncListeners!=null)
331 {
332 for (AsyncListener listener : _lastAsyncListeners)
333 {
334 try
335 {
336 listener.onStartAsync(_event);
337 }
338 catch(Exception e)
339 {
340 Log.warn(e);
341 }
342 }
343 }
344
345 }
346
347
348
349
350
351
352
353
354
355 protected boolean unhandle()
356 {
357 synchronized (this)
358 {
359 switch(_state)
360 {
361 case __REDISPATCHED:
362 case __DISPATCHED:
363 _state=__UNCOMPLETED;
364 return true;
365
366 case __IDLE:
367 throw new IllegalStateException(this.getStatusString());
368
369 case __ASYNCSTARTED:
370 _initial=false;
371 _state=__ASYNCWAIT;
372 scheduleTimeout();
373 if (_state==__ASYNCWAIT)
374 return true;
375 else if (_state==__COMPLETING)
376 {
377 _state=__UNCOMPLETED;
378 return true;
379 }
380
381 _initial=false;
382 _state=__REDISPATCHED;
383 return false;
384
385 case __REDISPATCHING:
386 _initial=false;
387 _state=__REDISPATCHED;
388 return false;
389
390 case __COMPLETING:
391 _initial=false;
392 _state=__UNCOMPLETED;
393 return true;
394
395 default:
396 throw new IllegalStateException(this.getStatusString());
397 }
398 }
399 }
400
401
402 public void dispatch()
403 {
404 boolean dispatch=false;
405 synchronized (this)
406 {
407 switch(_state)
408 {
409 case __ASYNCSTARTED:
410 _state=__REDISPATCHING;
411 _resumed=true;
412 return;
413
414 case __ASYNCWAIT:
415 dispatch=!_expired;
416 _state=__REDISPATCH;
417 _resumed=true;
418 break;
419
420 case __REDISPATCH:
421 return;
422
423 default:
424 throw new IllegalStateException(this.getStatusString());
425 }
426 }
427
428 if (dispatch)
429 {
430 cancelTimeout();
431 scheduleDispatch();
432 }
433 }
434
435
436 protected void expired()
437 {
438 final List<ContinuationListener> cListeners;
439 final List<AsyncListener> aListeners;
440 synchronized (this)
441 {
442 switch(_state)
443 {
444 case __ASYNCSTARTED:
445 case __ASYNCWAIT:
446 cListeners=_continuationListeners;
447 aListeners=_asyncListeners;
448 break;
449 default:
450 cListeners=null;
451 aListeners=null;
452 return;
453 }
454 _expired=true;
455 }
456
457 if (aListeners!=null)
458 {
459 for (AsyncListener listener : aListeners)
460 {
461 try
462 {
463 listener.onTimeout(_event);
464 }
465 catch(Exception e)
466 {
467 Log.warn(e);
468 }
469 }
470 }
471 if (cListeners!=null)
472 {
473 for (ContinuationListener listener : cListeners)
474 {
475 try
476 {
477 listener.onTimeout(this);
478 }
479 catch(Exception e)
480 {
481 Log.warn(e);
482 }
483 }
484 }
485
486
487
488 synchronized (this)
489 {
490 switch(_state)
491 {
492 case __ASYNCSTARTED:
493 case __ASYNCWAIT:
494 if (_continuation)
495 dispatch();
496 else
497
498 complete();
499 }
500 }
501
502 scheduleDispatch();
503 }
504
505
506
507
508
509 public void complete()
510 {
511
512 boolean dispatch=false;
513 synchronized (this)
514 {
515 switch(_state)
516 {
517 case __DISPATCHED:
518 case __REDISPATCHED:
519 throw new IllegalStateException(this.getStatusString());
520
521 case __ASYNCSTARTED:
522 _state=__COMPLETING;
523 return;
524
525 case __ASYNCWAIT:
526 _state=__COMPLETING;
527 dispatch=!_expired;
528 break;
529
530 default:
531 throw new IllegalStateException(this.getStatusString());
532 }
533 }
534
535 if (dispatch)
536 {
537 cancelTimeout();
538 scheduleDispatch();
539 }
540 }
541
542
543 @Override
544 public <T extends AsyncListener> T createListener(Class<T> clazz) throws ServletException
545 {
546 try
547 {
548
549 return clazz.newInstance();
550 }
551 catch(Exception e)
552 {
553 throw new ServletException(e);
554 }
555 }
556
557
558
559
560
561
562 protected void doComplete(Throwable ex)
563 {
564 final List<ContinuationListener> cListeners;
565 final List<AsyncListener> aListeners;
566 synchronized (this)
567 {
568 switch(_state)
569 {
570 case __UNCOMPLETED:
571 _state=__COMPLETED;
572 cListeners=_continuationListeners;
573 aListeners=_asyncListeners;
574 break;
575
576 default:
577 cListeners=null;
578 aListeners=null;
579 throw new IllegalStateException(this.getStatusString());
580 }
581 }
582
583 if (aListeners!=null)
584 {
585 for (AsyncListener listener : aListeners)
586 {
587 try
588 {
589 if (ex!=null)
590 {
591 _event.getSuppliedRequest().setAttribute(Dispatcher.ERROR_EXCEPTION,ex);
592 _event.getSuppliedRequest().setAttribute(Dispatcher.ERROR_MESSAGE,ex.getMessage());
593 listener.onError(_event);
594 }
595 else
596 listener.onComplete(_event);
597 }
598 catch(Exception e)
599 {
600 Log.warn(e);
601 }
602 }
603 }
604 if (cListeners!=null)
605 {
606 for (ContinuationListener listener : cListeners)
607 {
608 try
609 {
610 listener.onComplete(this);
611 }
612 catch(Exception e)
613 {
614 Log.warn(e);
615 }
616 }
617 }
618 }
619
620
621 protected void recycle()
622 {
623 synchronized (this)
624 {
625 switch(_state)
626 {
627 case __DISPATCHED:
628 case __REDISPATCHED:
629 throw new IllegalStateException(getStatusString());
630 default:
631 _state=__IDLE;
632 }
633 _initial = true;
634 _resumed=false;
635 _expired=false;
636 _responseWrapped=false;
637 cancelTimeout();
638 _timeoutMs=DEFAULT_TIMEOUT;
639 _continuationListeners=null;
640 }
641 }
642
643
644 public void cancel()
645 {
646 synchronized (this)
647 {
648 cancelTimeout();
649 _continuationListeners=null;
650 }
651 }
652
653
654 protected void scheduleDispatch()
655 {
656 EndPoint endp=_connection.getEndPoint();
657 if (!endp.isBlocking())
658 {
659 ((AsyncEndPoint)endp).dispatch();
660 }
661 }
662
663
664 protected void scheduleTimeout()
665 {
666 EndPoint endp=_connection.getEndPoint();
667 if (_timeoutMs>0)
668 {
669 if (endp.isBlocking())
670 {
671 synchronized(this)
672 {
673 _expireAt = System.currentTimeMillis()+_timeoutMs;
674 long wait=_timeoutMs;
675 while (_expireAt>0 && wait>0 && _connection.getServer().isRunning())
676 {
677 try
678 {
679 this.wait(wait);
680 }
681 catch (InterruptedException e)
682 {
683 Log.ignore(e);
684 }
685 wait=_expireAt-System.currentTimeMillis();
686 }
687
688 if (_expireAt>0 && wait<=0 && _connection.getServer().isRunning())
689 {
690 expired();
691 }
692 }
693 }
694 else
695 {
696 _connection.scheduleTimeout(_event._timeout,_timeoutMs);
697 }
698 }
699 }
700
701
702 protected void cancelTimeout()
703 {
704 EndPoint endp=_connection.getEndPoint();
705 if (endp.isBlocking())
706 {
707 synchronized(this)
708 {
709 _expireAt=0;
710 this.notifyAll();
711 }
712 }
713 else
714 {
715 final AsyncEventState event=_event;
716 if (event!=null)
717 _connection.cancelTimeout(event._timeout);
718 }
719 }
720
721
722 public boolean isCompleting()
723 {
724 synchronized (this)
725 {
726 return _state==__COMPLETING;
727 }
728 }
729
730
731 boolean isUncompleted()
732 {
733 synchronized (this)
734 {
735 return _state==__UNCOMPLETED;
736 }
737 }
738
739
740 public boolean isComplete()
741 {
742 synchronized (this)
743 {
744 return _state==__COMPLETED;
745 }
746 }
747
748
749
750 public boolean isAsyncStarted()
751 {
752 synchronized (this)
753 {
754 switch(_state)
755 {
756 case __ASYNCSTARTED:
757 case __REDISPATCHING:
758 case __REDISPATCH:
759 case __ASYNCWAIT:
760 return true;
761
762 default:
763 return false;
764 }
765 }
766 }
767
768
769
770 public boolean isAsync()
771 {
772 synchronized (this)
773 {
774 switch(_state)
775 {
776 case __IDLE:
777 case __DISPATCHED:
778 case __UNCOMPLETED:
779 case __COMPLETED:
780 return false;
781
782 default:
783 return true;
784 }
785 }
786 }
787
788
789 public void dispatch(ServletContext context, String path)
790 {
791 _event._dispatchContext=context;
792 _event._path=path;
793 dispatch();
794 }
795
796
797 public void dispatch(String path)
798 {
799 _event._path=path;
800 dispatch();
801 }
802
803
804 public Request getBaseRequest()
805 {
806 return _connection.getRequest();
807 }
808
809
810 public ServletRequest getRequest()
811 {
812 if (_event!=null)
813 return _event.getSuppliedRequest();
814 return _connection.getRequest();
815 }
816
817
818 public ServletResponse getResponse()
819 {
820 if (_responseWrapped && _event!=null && _event.getSuppliedResponse()!=null)
821 return _event.getSuppliedResponse();
822 return _connection.getResponse();
823 }
824
825
826 public void start(final Runnable run)
827 {
828 final AsyncEventState event=_event;
829 if (event!=null)
830 {
831 _connection.getServer().getThreadPool().dispatch(new Runnable()
832 {
833 public void run()
834 {
835 ((Context)event.getServletContext()).getContextHandler().handle(run);
836 }
837 });
838 }
839 }
840
841
842 public boolean hasOriginalRequestAndResponse()
843 {
844 synchronized (this)
845 {
846 return (_event!=null && _event.getSuppliedRequest()==_connection._request && _event.getSuppliedResponse()==_connection._response);
847 }
848 }
849
850
851 public ContextHandler getContextHandler()
852 {
853 final AsyncEventState event=_event;
854 if (event!=null)
855 return ((Context)event.getServletContext()).getContextHandler();
856 return null;
857 }
858
859
860
861
862
863
864 public boolean isResumed()
865 {
866 synchronized (this)
867 {
868 return _resumed;
869 }
870 }
871
872
873
874
875 public boolean isExpired()
876 {
877 synchronized (this)
878 {
879 return _expired;
880 }
881 }
882
883
884
885
886
887 public void resume()
888 {
889 dispatch();
890 }
891
892
893
894
895
896 public void suspend(ServletResponse response)
897 {
898 _continuation=true;
899 if (response instanceof ServletResponseWrapper)
900 {
901 _responseWrapped=true;
902 AsyncContinuation.this.suspend(_connection.getRequest().getServletContext(),_connection.getRequest(),response);
903 }
904 else
905 {
906 _responseWrapped=false;
907 AsyncContinuation.this.suspend(_connection.getRequest().getServletContext(),_connection.getRequest(),_connection.getResponse());
908 }
909 }
910
911
912
913
914
915 public void suspend()
916 {
917 _responseWrapped=false;
918 _continuation=true;
919 AsyncContinuation.this.suspend(_connection.getRequest().getServletContext(),_connection.getRequest(),_connection.getResponse());
920 }
921
922
923
924
925
926 public ServletResponse getServletResponse()
927 {
928 if (_responseWrapped && _event!=null && _event.getSuppliedResponse()!=null)
929 return _event.getSuppliedResponse();
930 return _connection.getResponse();
931 }
932
933
934
935
936
937 public Object getAttribute(String name)
938 {
939 return _connection.getRequest().getAttribute(name);
940 }
941
942
943
944
945
946 public void removeAttribute(String name)
947 {
948 _connection.getRequest().removeAttribute(name);
949 }
950
951
952
953
954
955 public void setAttribute(String name, Object attribute)
956 {
957 _connection.getRequest().setAttribute(name,attribute);
958 }
959
960
961
962
963
964 public void undispatch()
965 {
966 if (isSuspended())
967 {
968 if (Log.isDebugEnabled())
969 throw new ContinuationThrowable();
970 else
971 throw __exception;
972 }
973 throw new IllegalStateException("!suspended");
974 }
975
976
977
978 public class AsyncTimeout extends Timeout.Task implements Runnable
979 {
980 @Override
981 public void expired()
982 {
983 AsyncContinuation.this.expired();
984 }
985
986 @Override
987 public void run()
988 {
989 AsyncContinuation.this.expired();
990 }
991 }
992
993
994
995 public class AsyncEventState extends AsyncEvent
996 {
997 private final ServletContext _suspendedContext;
998 private ServletContext _dispatchContext;
999 private String _path;
1000 private Timeout.Task _timeout= new AsyncTimeout();
1001
1002 public AsyncEventState(ServletContext context, ServletRequest request, ServletResponse response)
1003 {
1004 super(AsyncContinuation.this, request,response);
1005 _suspendedContext=context;
1006 }
1007
1008 public ServletContext getSuspendedContext()
1009 {
1010 return _suspendedContext;
1011 }
1012
1013 public ServletContext getDispatchContext()
1014 {
1015 return _dispatchContext;
1016 }
1017
1018 public ServletContext getServletContext()
1019 {
1020 return _dispatchContext==null?_suspendedContext:_dispatchContext;
1021 }
1022
1023 public String getPath()
1024 {
1025 return _path;
1026 }
1027 }
1028 }