1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.server;
15
16 import java.util.ArrayList;
17 import java.util.List;
18
19 import javax.servlet.ServletContext;
20 import javax.servlet.ServletRequest;
21 import javax.servlet.ServletResponse;
22
23 import org.eclipse.jetty.continuation.Continuation;
24 import org.eclipse.jetty.continuation.ContinuationListener;
25 import org.eclipse.jetty.continuation.ContinuationThrowable;
26 import org.eclipse.jetty.io.AsyncEndPoint;
27 import org.eclipse.jetty.io.EndPoint;
28 import org.eclipse.jetty.server.handler.ContextHandler;
29 import org.eclipse.jetty.server.handler.ContextHandler.Context;
30 import org.eclipse.jetty.util.log.Log;
31 import org.eclipse.jetty.util.log.Logger;
32 import org.eclipse.jetty.util.thread.Timeout;
33
34
35
36
37
38 public class AsyncContinuation implements AsyncContext, Continuation
39 {
40 private static final Logger LOG = Log.getLogger(AsyncContinuation.class);
41
42 private final static long DEFAULT_TIMEOUT=30000L;
43
44 private final static ContinuationThrowable __exception = new ContinuationThrowable();
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59 private static final int __IDLE=0;
60 private static final int __DISPATCHED=1;
61 private static final int __ASYNCSTARTED=2;
62 private static final int __REDISPATCHING=3;
63 private static final int __ASYNCWAIT=4;
64 private static final int __REDISPATCH=5;
65 private static final int __REDISPATCHED=6;
66 private static final int __COMPLETING=7;
67 private static final int __UNCOMPLETED=8;
68 private static final int __COMPLETED=9;
69
70
71
72 protected AbstractHttpConnection _connection;
73 private List<ContinuationListener> _continuationListeners;
74
75
76 private int _state;
77 private boolean _initial;
78 private boolean _resumed;
79 private boolean _expired;
80 private volatile boolean _responseWrapped;
81 private long _timeoutMs=DEFAULT_TIMEOUT;
82 private AsyncEventState _event;
83 private volatile long _expireAt;
84
85
86 protected AsyncContinuation()
87 {
88 _state=__IDLE;
89 _initial=true;
90 }
91
92
93 protected void setConnection(final AbstractHttpConnection connection)
94 {
95 synchronized(this)
96 {
97 _connection=connection;
98 }
99 }
100
101
102 public void addContinuationListener(ContinuationListener listener)
103 {
104 synchronized(this)
105 {
106 if (_continuationListeners==null)
107 _continuationListeners=new ArrayList<ContinuationListener>();
108 _continuationListeners.add(listener);
109 }
110 }
111
112
113 public void setTimeout(long ms)
114 {
115 synchronized(this)
116 {
117 _timeoutMs=ms;
118 }
119 }
120
121
122 public long getTimeout()
123 {
124 synchronized(this)
125 {
126 return _timeoutMs;
127 }
128 }
129
130
131 public AsyncEventState getAsyncEventState()
132 {
133 synchronized(this)
134 {
135 return _event;
136 }
137 }
138
139
140
141
142
143
144
145
146
147
148 public boolean isResponseWrapped()
149 {
150 return _responseWrapped;
151 }
152
153
154
155
156
157 public boolean isInitial()
158 {
159 synchronized(this)
160 {
161 return _initial;
162 }
163 }
164
165
166
167
168
169 public boolean isSuspended()
170 {
171 synchronized(this)
172 {
173 switch(_state)
174 {
175 case __ASYNCSTARTED:
176 case __REDISPATCHING:
177 case __COMPLETING:
178 case __ASYNCWAIT:
179 return true;
180
181 default:
182 return false;
183 }
184 }
185 }
186
187
188 public boolean isSuspending()
189 {
190 synchronized(this)
191 {
192 switch(_state)
193 {
194 case __ASYNCSTARTED:
195 case __ASYNCWAIT:
196 return true;
197
198 default:
199 return false;
200 }
201 }
202 }
203
204
205 public boolean isDispatchable()
206 {
207 synchronized(this)
208 {
209 switch(_state)
210 {
211 case __REDISPATCH:
212 case __REDISPATCHED:
213 case __REDISPATCHING:
214 case __COMPLETING:
215 return true;
216
217 default:
218 return false;
219 }
220 }
221 }
222
223
224 @Override
225 public String toString()
226 {
227 synchronized (this)
228 {
229 return super.toString()+"@"+getStatusString();
230 }
231 }
232
233
234 public String getStatusString()
235 {
236 synchronized (this)
237 {
238 return
239 ((_state==__IDLE)?"IDLE":
240 (_state==__DISPATCHED)?"DISPATCHED":
241 (_state==__ASYNCSTARTED)?"ASYNCSTARTED":
242 (_state==__ASYNCWAIT)?"ASYNCWAIT":
243 (_state==__REDISPATCHING)?"REDISPATCHING":
244 (_state==__REDISPATCH)?"REDISPATCH":
245 (_state==__REDISPATCHED)?"REDISPATCHED":
246 (_state==__COMPLETING)?"COMPLETING":
247 (_state==__UNCOMPLETED)?"UNCOMPLETED":
248 (_state==__COMPLETED)?"COMPLETE":
249 ("UNKNOWN?"+_state))+
250 (_initial?",initial":"")+
251 (_resumed?",resumed":"")+
252 (_expired?",expired":"");
253 }
254 }
255
256
257
258
259
260 protected boolean handling()
261 {
262 synchronized (this)
263 {
264 _responseWrapped=false;
265
266 switch(_state)
267 {
268 case __IDLE:
269 _initial=true;
270 _state=__DISPATCHED;
271 return true;
272
273 case __COMPLETING:
274 _state=__UNCOMPLETED;
275 return false;
276
277 case __ASYNCWAIT:
278 return false;
279
280 case __REDISPATCH:
281 _state=__REDISPATCHED;
282 return true;
283
284 default:
285 throw new IllegalStateException(this.getStatusString());
286 }
287 }
288 }
289
290
291
292
293
294 protected void suspend(final ServletContext context,
295 final ServletRequest request,
296 final ServletResponse response)
297 {
298 synchronized (this)
299 {
300 switch(_state)
301 {
302 case __DISPATCHED:
303 case __REDISPATCHED:
304 _resumed=false;
305 _expired=false;
306
307 if (_event==null || request!=_event.getRequest() || response != _event.getResponse() || context != _event.getServletContext())
308 _event=new AsyncEventState(context,request,response);
309 else
310 {
311 _event._dispatchContext=null;
312 _event._path=null;
313 }
314
315 _state=__ASYNCSTARTED;
316 break;
317
318 default:
319 throw new IllegalStateException(this.getStatusString());
320 }
321 }
322
323 }
324
325
326
327
328
329
330
331
332
333 protected boolean unhandle()
334 {
335 synchronized (this)
336 {
337 List<ContinuationListener> listeners=_continuationListeners;
338
339 switch(_state)
340 {
341 case __REDISPATCHED:
342 case __DISPATCHED:
343 _state=__UNCOMPLETED;
344 return true;
345
346 case __IDLE:
347 throw new IllegalStateException(this.getStatusString());
348
349 case __ASYNCSTARTED:
350 _initial=false;
351 _state=__ASYNCWAIT;
352 scheduleTimeout();
353 if (_state==__ASYNCWAIT)
354 return true;
355 else if (_state==__COMPLETING)
356 {
357 _state=__UNCOMPLETED;
358 return true;
359 }
360 _initial=false;
361 _state=__REDISPATCHED;
362 return false;
363
364 case __REDISPATCHING:
365 _initial=false;
366 _state=__REDISPATCHED;
367 return false;
368
369 case __COMPLETING:
370 _initial=false;
371 _state=__UNCOMPLETED;
372 return true;
373
374 default:
375 throw new IllegalStateException(this.getStatusString());
376 }
377 }
378 }
379
380
381 public void dispatch()
382 {
383 boolean dispatch=false;
384 synchronized (this)
385 {
386 switch(_state)
387 {
388 case __ASYNCSTARTED:
389 _state=__REDISPATCHING;
390 _resumed=true;
391 return;
392
393 case __ASYNCWAIT:
394 dispatch=!_expired;
395 _state=__REDISPATCH;
396 _resumed=true;
397 break;
398
399 case __REDISPATCH:
400 return;
401
402 default:
403 throw new IllegalStateException(this.getStatusString());
404 }
405 }
406
407 if (dispatch)
408 {
409 cancelTimeout();
410 scheduleDispatch();
411 }
412 }
413
414
415 protected void expired()
416 {
417 final List<ContinuationListener> listeners;
418 synchronized (this)
419 {
420 switch(_state)
421 {
422 case __ASYNCSTARTED:
423 case __ASYNCWAIT:
424 listeners=_continuationListeners;
425 break;
426 default:
427 listeners=null;
428 return;
429 }
430 _expired=true;
431 }
432
433 if (listeners!=null)
434 {
435 for (int i=0;i<listeners.size();i++)
436 {
437 ContinuationListener listener=listeners.get(i);
438 try
439 {
440 listener.onTimeout(this);
441 }
442 catch(Exception e)
443 {
444 LOG.warn(e);
445 }
446 }
447 }
448
449 synchronized (this)
450 {
451 switch(_state)
452 {
453 case __ASYNCSTARTED:
454 case __ASYNCWAIT:
455 dispatch();
456 }
457 }
458
459 scheduleDispatch();
460 }
461
462
463
464
465
466 public void complete()
467 {
468
469 boolean dispatch=false;
470 synchronized (this)
471 {
472 switch(_state)
473 {
474 case __DISPATCHED:
475 case __REDISPATCHED:
476 throw new IllegalStateException(this.getStatusString());
477
478 case __ASYNCSTARTED:
479 _state=__COMPLETING;
480 return;
481
482 case __ASYNCWAIT:
483 _state=__COMPLETING;
484 dispatch=!_expired;
485 break;
486
487 default:
488 throw new IllegalStateException(this.getStatusString());
489 }
490 }
491
492 if (dispatch)
493 {
494 cancelTimeout();
495 scheduleDispatch();
496 }
497 }
498
499
500
501
502
503
504 protected void doComplete()
505 {
506 final List<ContinuationListener> listeners;
507 synchronized (this)
508 {
509 switch(_state)
510 {
511 case __UNCOMPLETED:
512 _state=__COMPLETED;
513 listeners=_continuationListeners;
514 break;
515
516 default:
517 listeners=null;
518 throw new IllegalStateException(this.getStatusString());
519 }
520 }
521
522 if (listeners!=null)
523 {
524 for(int i=0;i<listeners.size();i++)
525 {
526 try
527 {
528 listeners.get(i).onComplete(this);
529 }
530 catch(Exception e)
531 {
532 LOG.warn(e);
533 }
534 }
535 }
536 }
537
538
539 protected void recycle()
540 {
541 synchronized (this)
542 {
543
544 switch(_state)
545 {
546 case __DISPATCHED:
547 case __REDISPATCHED:
548 throw new IllegalStateException(getStatusString());
549 default:
550 _state=__IDLE;
551 }
552 _initial = true;
553 _resumed=false;
554 _expired=false;
555 _responseWrapped=false;
556 cancelTimeout();
557 _timeoutMs=DEFAULT_TIMEOUT;
558 _continuationListeners=null;
559 }
560 }
561
562
563 public void cancel()
564 {
565 synchronized (this)
566 {
567 cancelTimeout();
568 _continuationListeners=null;
569 }
570 }
571
572
573 protected void scheduleDispatch()
574 {
575 EndPoint endp=_connection.getEndPoint();
576 if (!endp.isBlocking())
577 {
578 ((AsyncEndPoint)endp).asyncDispatch();
579 }
580 }
581
582
583 protected void scheduleTimeout()
584 {
585 EndPoint endp=_connection.getEndPoint();
586 if (_timeoutMs>0)
587 {
588 if (endp.isBlocking())
589 {
590 synchronized(this)
591 {
592 _expireAt = System.currentTimeMillis()+_timeoutMs;
593 long wait=_timeoutMs;
594 while (_expireAt>0 && wait>0 && _connection.getServer().isRunning())
595 {
596 try
597 {
598 this.wait(wait);
599 }
600 catch (InterruptedException e)
601 {
602 LOG.ignore(e);
603 }
604 wait=_expireAt-System.currentTimeMillis();
605 }
606
607 if (_expireAt>0 && wait<=0 && _connection.getServer().isRunning())
608 {
609 expired();
610 }
611 }
612 }
613 else
614 {
615 ((AsyncEndPoint)endp).scheduleTimeout(_event,_timeoutMs);
616 }
617 }
618 }
619
620
621 protected void cancelTimeout()
622 {
623 EndPoint endp=_connection.getEndPoint();
624 if (endp.isBlocking())
625 {
626 synchronized(this)
627 {
628 _expireAt=0;
629 this.notifyAll();
630 }
631 }
632 else
633 {
634 final AsyncEventState event=_event;
635 if (event!=null)
636 {
637 ((AsyncEndPoint)endp).cancelTimeout(event);
638 }
639 }
640 }
641
642
643 public boolean isCompleting()
644 {
645 synchronized (this)
646 {
647 return _state==__COMPLETING;
648 }
649 }
650
651
652 boolean isUncompleted()
653 {
654 synchronized (this)
655 {
656 return _state==__UNCOMPLETED;
657 }
658 }
659
660
661 public boolean isComplete()
662 {
663 synchronized (this)
664 {
665 return _state==__COMPLETED;
666 }
667 }
668
669
670
671 public boolean isAsyncStarted()
672 {
673 synchronized (this)
674 {
675 switch(_state)
676 {
677 case __ASYNCSTARTED:
678 case __REDISPATCHING:
679 case __REDISPATCH:
680 case __ASYNCWAIT:
681 return true;
682
683 default:
684 return false;
685 }
686 }
687 }
688
689
690
691 public boolean isAsync()
692 {
693 synchronized (this)
694 {
695 switch(_state)
696 {
697 case __IDLE:
698 case __DISPATCHED:
699 case __UNCOMPLETED:
700 case __COMPLETED:
701 return false;
702
703 default:
704 return true;
705 }
706 }
707 }
708
709
710 public void dispatch(ServletContext context, String path)
711 {
712 _event._dispatchContext=context;
713 _event._path=path;
714 dispatch();
715 }
716
717
718 public void dispatch(String path)
719 {
720 _event._path=path;
721 dispatch();
722 }
723
724
725 public Request getBaseRequest()
726 {
727 return _connection.getRequest();
728 }
729
730
731 public ServletRequest getRequest()
732 {
733 if (_event!=null)
734 return _event.getRequest();
735 return _connection.getRequest();
736 }
737
738
739 public ServletResponse getResponse()
740 {
741 if (_event!=null)
742 return _event.getResponse();
743 return _connection.getResponse();
744 }
745
746
747 public void start(final Runnable run)
748 {
749 final AsyncEventState event=_event;
750 if (event!=null)
751 {
752 _connection.getServer().getThreadPool().dispatch(new Runnable()
753 {
754 public void run()
755 {
756 ((Context)event.getServletContext()).getContextHandler().handle(run);
757 }
758 });
759 }
760 }
761
762
763 public boolean hasOriginalRequestAndResponse()
764 {
765 synchronized (this)
766 {
767 return (_event!=null && _event.getRequest()==_connection._request && _event.getResponse()==_connection._response);
768 }
769 }
770
771
772 public ContextHandler getContextHandler()
773 {
774 final AsyncEventState event=_event;
775 if (event!=null)
776 return ((Context)event.getServletContext()).getContextHandler();
777 return null;
778 }
779
780
781
782
783
784
785 public boolean isResumed()
786 {
787 synchronized (this)
788 {
789 return _resumed;
790 }
791 }
792
793
794
795
796 public boolean isExpired()
797 {
798 synchronized (this)
799 {
800 return _expired;
801 }
802 }
803
804
805
806
807
808 public void resume()
809 {
810 dispatch();
811 }
812
813
814
815
816
817 public void suspend(ServletResponse response)
818 {
819 _responseWrapped=!(response instanceof Response);
820 AsyncContinuation.this.suspend(_connection.getRequest().getServletContext(),_connection.getRequest(),response);
821 }
822
823
824
825
826
827 public void suspend()
828 {
829 _responseWrapped=false;
830 AsyncContinuation.this.suspend(_connection.getRequest().getServletContext(),_connection.getRequest(),_connection.getResponse());
831 }
832
833
834
835
836
837 public ServletResponse getServletResponse()
838 {
839 if (_responseWrapped && _event!=null && _event.getResponse()!=null)
840 return _event.getResponse();
841 return _connection.getResponse();
842 }
843
844
845
846
847
848 public Object getAttribute(String name)
849 {
850 return _connection.getRequest().getAttribute(name);
851 }
852
853
854
855
856
857 public void removeAttribute(String name)
858 {
859 _connection.getRequest().removeAttribute(name);
860 }
861
862
863
864
865
866 public void setAttribute(String name, Object attribute)
867 {
868 _connection.getRequest().setAttribute(name,attribute);
869 }
870
871
872
873
874
875 public void undispatch()
876 {
877 if (isSuspended())
878 {
879 if (LOG.isDebugEnabled())
880 throw new ContinuationThrowable();
881 else
882 throw __exception;
883 }
884 throw new IllegalStateException("!suspended");
885 }
886
887
888
889 public class AsyncEventState extends Timeout.Task implements Runnable
890 {
891 private final ServletContext _suspendedContext;
892 private final ServletRequest _request;
893 private final ServletResponse _response;
894 private ServletContext _dispatchContext;
895 private String _path;
896
897 public AsyncEventState(ServletContext context, ServletRequest request, ServletResponse response)
898 {
899 _suspendedContext=context;
900 _request=request;
901 _response=response;
902 }
903
904 public ServletContext getSuspendedContext()
905 {
906 return _suspendedContext;
907 }
908
909 public ServletContext getDispatchContext()
910 {
911 return _dispatchContext;
912 }
913
914 public ServletContext getServletContext()
915 {
916 return _dispatchContext==null?_suspendedContext:_dispatchContext;
917 }
918
919 public ServletRequest getRequest()
920 {
921 return _request;
922 }
923
924 public ServletResponse getResponse()
925 {
926 return _response;
927 }
928
929 public String getPath()
930 {
931 return _path;
932 }
933
934 @Override
935 public void expired()
936 {
937 AsyncContinuation.this.expired();
938 }
939
940 public void run()
941 {
942 AsyncContinuation.this.expired();
943 }
944 }
945 }