1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.server;
15
16 import java.util.ArrayList;
17 import java.util.List;
18
19 import javax.servlet.ServletContext;
20 import javax.servlet.ServletRequest;
21 import javax.servlet.ServletResponse;
22
23 import org.eclipse.jetty.continuation.Continuation;
24 import org.eclipse.jetty.continuation.ContinuationListener;
25 import org.eclipse.jetty.continuation.ContinuationThrowable;
26 import org.eclipse.jetty.io.AsyncEndPoint;
27 import org.eclipse.jetty.io.EndPoint;
28 import org.eclipse.jetty.server.handler.ContextHandler;
29 import org.eclipse.jetty.server.handler.ContextHandler.Context;
30 import org.eclipse.jetty.util.log.Log;
31 import org.eclipse.jetty.util.thread.Timeout;
32
33
34
35
36
37 public class AsyncContinuation implements AsyncContext, Continuation
38 {
39 private final static long DEFAULT_TIMEOUT=30000L;
40
41 private final static ContinuationThrowable __exception = new ContinuationThrowable();
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56 private static final int __IDLE=0;
57 private static final int __DISPATCHED=1;
58 private static final int __ASYNCSTARTED=2;
59 private static final int __REDISPATCHING=3;
60 private static final int __ASYNCWAIT=4;
61 private static final int __REDISPATCH=5;
62 private static final int __REDISPATCHED=6;
63 private static final int __COMPLETING=7;
64 private static final int __UNCOMPLETED=8;
65 private static final int __COMPLETED=9;
66
67
68
69 protected HttpConnection _connection;
70 private List<ContinuationListener> _continuationListeners;
71
72
73 private int _state;
74 private boolean _initial;
75 private boolean _resumed;
76 private boolean _expired;
77 private volatile boolean _responseWrapped;
78 private long _timeoutMs=DEFAULT_TIMEOUT;
79 private AsyncEventState _event;
80 private volatile long _expireAt;
81
82
83 protected AsyncContinuation()
84 {
85 _state=__IDLE;
86 _initial=true;
87 }
88
89
90 protected void setConnection(final HttpConnection connection)
91 {
92 synchronized(this)
93 {
94 _connection=connection;
95 }
96 }
97
98
99 public void addContinuationListener(ContinuationListener listener)
100 {
101 synchronized(this)
102 {
103 if (_continuationListeners==null)
104 _continuationListeners=new ArrayList<ContinuationListener>();
105 _continuationListeners.add(listener);
106 }
107 }
108
109
110 public void setTimeout(long ms)
111 {
112 synchronized(this)
113 {
114 _timeoutMs=ms;
115 }
116 }
117
118
119 public long getTimeout()
120 {
121 synchronized(this)
122 {
123 return _timeoutMs;
124 }
125 }
126
127
128 public AsyncEventState getAsyncEventState()
129 {
130 synchronized(this)
131 {
132 return _event;
133 }
134 }
135
136
137
138
139
140
141
142
143
144
145 public boolean isResponseWrapped()
146 {
147 return _responseWrapped;
148 }
149
150
151
152
153
154 public boolean isInitial()
155 {
156 synchronized(this)
157 {
158 return _initial;
159 }
160 }
161
162
163
164
165
166 public boolean isSuspended()
167 {
168 synchronized(this)
169 {
170 switch(_state)
171 {
172 case __ASYNCSTARTED:
173 case __REDISPATCHING:
174 case __COMPLETING:
175 case __ASYNCWAIT:
176 return true;
177
178 default:
179 return false;
180 }
181 }
182 }
183
184
185 @Override
186 public String toString()
187 {
188 synchronized (this)
189 {
190 return super.toString()+"@"+getStatusString();
191 }
192 }
193
194
195 public String getStatusString()
196 {
197 synchronized (this)
198 {
199 return
200 ((_state==__IDLE)?"IDLE":
201 (_state==__DISPATCHED)?"DISPATCHED":
202 (_state==__ASYNCSTARTED)?"ASYNCSTARTED":
203 (_state==__ASYNCWAIT)?"ASYNCWAIT":
204 (_state==__REDISPATCHING)?"REDISPATCHING":
205 (_state==__REDISPATCH)?"REDISPATCH":
206 (_state==__REDISPATCHED)?"REDISPATCHED":
207 (_state==__COMPLETING)?"COMPLETING":
208 (_state==__UNCOMPLETED)?"UNCOMPLETED":
209 (_state==__COMPLETED)?"COMPLETE":
210 ("UNKNOWN?"+_state))+
211 (_initial?",initial":"")+
212 (_resumed?",resumed":"")+
213 (_expired?",expired":"");
214 }
215 }
216
217
218
219
220
221 protected boolean handling()
222 {
223 synchronized (this)
224 {
225 _responseWrapped=false;
226
227 switch(_state)
228 {
229 case __IDLE:
230 _initial=true;
231 _state=__DISPATCHED;
232 return true;
233
234 case __COMPLETING:
235 _state=__UNCOMPLETED;
236 return false;
237
238 case __ASYNCWAIT:
239 return false;
240
241 case __REDISPATCH:
242 _state=__REDISPATCHED;
243 return true;
244
245 default:
246 throw new IllegalStateException(this.getStatusString());
247 }
248 }
249 }
250
251
252
253
254
255 protected void suspend(final ServletContext context,
256 final ServletRequest request,
257 final ServletResponse response)
258 {
259 synchronized (this)
260 {
261 _resumed=false;
262 _expired=false;
263
264 if (_event==null || request!=_event.getRequest() || response != _event.getResponse() || context != _event.getServletContext())
265 _event=new AsyncEventState(context,request,response);
266 else
267 {
268 _event._dispatchContext=null;
269 _event._path=null;
270 }
271
272 switch(_state)
273 {
274 case __DISPATCHED:
275 case __REDISPATCHED:
276 _state=__ASYNCSTARTED;
277 break;
278
279 default:
280 throw new IllegalStateException(this.getStatusString());
281 }
282 }
283
284 }
285
286
287
288
289
290
291
292
293
294 protected boolean unhandle()
295 {
296 synchronized (this)
297 {
298 List<ContinuationListener> listeners=_continuationListeners;
299
300 switch(_state)
301 {
302 case __REDISPATCHED:
303 case __DISPATCHED:
304 _state=__UNCOMPLETED;
305 return true;
306
307 case __IDLE:
308 throw new IllegalStateException(this.getStatusString());
309
310 case __ASYNCSTARTED:
311 _initial=false;
312 _state=__ASYNCWAIT;
313 scheduleTimeout();
314 if (_state==__ASYNCWAIT)
315 return true;
316 else if (_state==__COMPLETING)
317 {
318 _state=__UNCOMPLETED;
319 return true;
320 }
321 _initial=false;
322 _state=__REDISPATCHED;
323 return false;
324
325 case __REDISPATCHING:
326 _initial=false;
327 _state=__REDISPATCHED;
328 return false;
329
330 case __COMPLETING:
331 _initial=false;
332 _state=__UNCOMPLETED;
333 return true;
334
335 default:
336 throw new IllegalStateException(this.getStatusString());
337 }
338 }
339 }
340
341
342 public void dispatch()
343 {
344 boolean dispatch=false;
345 synchronized (this)
346 {
347 switch(_state)
348 {
349 case __ASYNCSTARTED:
350 _state=__REDISPATCHING;
351 _resumed=true;
352 return;
353
354 case __ASYNCWAIT:
355 dispatch=!_expired;
356 _state=__REDISPATCH;
357 _resumed=true;
358 break;
359
360 case __REDISPATCH:
361 return;
362
363 default:
364 throw new IllegalStateException(this.getStatusString());
365 }
366 }
367
368 if (dispatch)
369 {
370 cancelTimeout();
371 scheduleDispatch();
372 }
373 }
374
375
376 protected void expired()
377 {
378 final List<ContinuationListener> listeners;
379 synchronized (this)
380 {
381 switch(_state)
382 {
383 case __ASYNCSTARTED:
384 case __ASYNCWAIT:
385 listeners=_continuationListeners;
386 break;
387 default:
388 listeners=null;
389 return;
390 }
391 _expired=true;
392 }
393
394 if (listeners!=null)
395 {
396 for (int i=0;i<listeners.size();i++)
397 {
398 ContinuationListener listener=listeners.get(i);
399 try
400 {
401 listener.onTimeout(this);
402 }
403 catch(Exception e)
404 {
405 Log.warn(e);
406 }
407 }
408 }
409
410 synchronized (this)
411 {
412 switch(_state)
413 {
414 case __ASYNCSTARTED:
415 case __ASYNCWAIT:
416 dispatch();
417 }
418 }
419
420 scheduleDispatch();
421 }
422
423
424
425
426
427 public void complete()
428 {
429
430 boolean dispatch=false;
431 synchronized (this)
432 {
433 switch(_state)
434 {
435 case __DISPATCHED:
436 case __REDISPATCHED:
437 throw new IllegalStateException(this.getStatusString());
438
439 case __ASYNCSTARTED:
440 _state=__COMPLETING;
441 return;
442
443 case __ASYNCWAIT:
444 _state=__COMPLETING;
445 dispatch=!_expired;
446 break;
447
448 default:
449 throw new IllegalStateException(this.getStatusString());
450 }
451 }
452
453 if (dispatch)
454 {
455 cancelTimeout();
456 scheduleDispatch();
457 }
458 }
459
460
461
462
463
464
465 protected void doComplete()
466 {
467 final List<ContinuationListener> listeners;
468 synchronized (this)
469 {
470 switch(_state)
471 {
472 case __UNCOMPLETED:
473 _state=__COMPLETED;
474 listeners=_continuationListeners;
475 break;
476
477 default:
478 listeners=null;
479 throw new IllegalStateException(this.getStatusString());
480 }
481 }
482
483 if (listeners!=null)
484 {
485 for(int i=0;i<listeners.size();i++)
486 {
487 try
488 {
489 listeners.get(i).onComplete(this);
490 }
491 catch(Exception e)
492 {
493 Log.warn(e);
494 }
495 }
496 }
497 }
498
499
500 protected void recycle()
501 {
502 synchronized (this)
503 {
504
505 switch(_state)
506 {
507 case __DISPATCHED:
508 case __REDISPATCHED:
509 throw new IllegalStateException(getStatusString());
510 default:
511 _state=__IDLE;
512 }
513 _initial = true;
514 _resumed=false;
515 _expired=false;
516 _responseWrapped=false;
517 cancelTimeout();
518 _timeoutMs=DEFAULT_TIMEOUT;
519 _continuationListeners=null;
520 }
521 }
522
523
524 public void cancel()
525 {
526 synchronized (this)
527 {
528 cancelTimeout();
529 _continuationListeners=null;
530 }
531 }
532
533
534 protected void scheduleDispatch()
535 {
536 EndPoint endp=_connection.getEndPoint();
537 if (!endp.isBlocking())
538 {
539 ((AsyncEndPoint)endp).dispatch();
540 }
541 }
542
543
544 protected void scheduleTimeout()
545 {
546 EndPoint endp=_connection.getEndPoint();
547 if (_timeoutMs>0)
548 {
549 if (endp.isBlocking())
550 {
551 synchronized(this)
552 {
553 _expireAt = System.currentTimeMillis()+_timeoutMs;
554 long wait=_timeoutMs;
555 while (_expireAt>0 && wait>0)
556 {
557 try
558 {
559 this.wait(wait);
560 }
561 catch (InterruptedException e)
562 {
563 Log.ignore(e);
564 }
565 wait=_expireAt-System.currentTimeMillis();
566 }
567
568 if (_expireAt>0 && wait<=0)
569 {
570 expired();
571 }
572 }
573 }
574 else
575 {
576 _connection.scheduleTimeout(_event,_timeoutMs);
577 }
578 }
579 }
580
581
582 protected void cancelTimeout()
583 {
584 EndPoint endp=_connection.getEndPoint();
585 if (endp.isBlocking())
586 {
587 synchronized(this)
588 {
589 _expireAt=0;
590 this.notifyAll();
591 }
592 }
593 else
594 {
595 final AsyncEventState event=_event;
596 if (event!=null)
597 _connection.cancelTimeout(event);
598 }
599 }
600
601
602 public boolean isCompleting()
603 {
604 synchronized (this)
605 {
606 return _state==__COMPLETING;
607 }
608 }
609
610
611 boolean isUncompleted()
612 {
613 synchronized (this)
614 {
615 return _state==__UNCOMPLETED;
616 }
617 }
618
619
620 public boolean isComplete()
621 {
622 synchronized (this)
623 {
624 return _state==__COMPLETED;
625 }
626 }
627
628
629
630 public boolean isAsyncStarted()
631 {
632 synchronized (this)
633 {
634 switch(_state)
635 {
636 case __ASYNCSTARTED:
637 case __REDISPATCHING:
638 case __REDISPATCH:
639 case __ASYNCWAIT:
640 return true;
641
642 default:
643 return false;
644 }
645 }
646 }
647
648
649
650 public boolean isAsync()
651 {
652 synchronized (this)
653 {
654 switch(_state)
655 {
656 case __IDLE:
657 case __DISPATCHED:
658 case __UNCOMPLETED:
659 case __COMPLETED:
660 return false;
661
662 default:
663 return true;
664 }
665 }
666 }
667
668
669 public void dispatch(ServletContext context, String path)
670 {
671 _event._dispatchContext=context;
672 _event._path=path;
673 dispatch();
674 }
675
676
677 public void dispatch(String path)
678 {
679 _event._path=path;
680 dispatch();
681 }
682
683
684 public Request getBaseRequest()
685 {
686 return _connection.getRequest();
687 }
688
689
690 public ServletRequest getRequest()
691 {
692 if (_event!=null)
693 return _event.getRequest();
694 return _connection.getRequest();
695 }
696
697
698 public ServletResponse getResponse()
699 {
700 if (_event!=null)
701 return _event.getResponse();
702 return _connection.getResponse();
703 }
704
705
706 public void start(Runnable run)
707 {
708 final AsyncEventState event=_event;
709 if (event!=null)
710 ((Context)event.getServletContext()).getContextHandler().handle(run);
711 }
712
713
714 public boolean hasOriginalRequestAndResponse()
715 {
716 synchronized (this)
717 {
718 return (_event!=null && _event.getRequest()==_connection._request && _event.getResponse()==_connection._response);
719 }
720 }
721
722
723 public ContextHandler getContextHandler()
724 {
725 final AsyncEventState event=_event;
726 if (event!=null)
727 return ((Context)event.getServletContext()).getContextHandler();
728 return null;
729 }
730
731
732
733
734
735
736 public boolean isResumed()
737 {
738 synchronized (this)
739 {
740 return _resumed;
741 }
742 }
743
744
745
746
747 public boolean isExpired()
748 {
749 synchronized (this)
750 {
751 return _expired;
752 }
753 }
754
755
756
757
758
759 public void resume()
760 {
761 dispatch();
762 }
763
764
765
766
767
768 public void suspend(ServletResponse response)
769 {
770 _responseWrapped=!(response instanceof Response);
771 AsyncContinuation.this.suspend(_connection.getRequest().getServletContext(),_connection.getRequest(),response);
772 }
773
774
775
776
777
778 public void suspend()
779 {
780 _responseWrapped=false;
781 AsyncContinuation.this.suspend(_connection.getRequest().getServletContext(),_connection.getRequest(),_connection.getResponse());
782 }
783
784
785
786
787
788 public ServletResponse getServletResponse()
789 {
790 if (_responseWrapped && _event!=null && _event.getResponse()!=null)
791 return _event.getResponse();
792 return _connection.getResponse();
793 }
794
795
796
797
798
799 public Object getAttribute(String name)
800 {
801 return _connection.getRequest().getAttribute(name);
802 }
803
804
805
806
807
808 public void removeAttribute(String name)
809 {
810 _connection.getRequest().removeAttribute(name);
811 }
812
813
814
815
816
817 public void setAttribute(String name, Object attribute)
818 {
819 _connection.getRequest().setAttribute(name,attribute);
820 }
821
822
823
824
825
826 public void undispatch()
827 {
828 if (isSuspended())
829 {
830 if (Log.isDebugEnabled())
831 throw new ContinuationThrowable();
832 else
833 throw __exception;
834 }
835 throw new IllegalStateException("!suspended");
836 }
837
838
839
840 public class AsyncEventState extends Timeout.Task implements Runnable
841 {
842 private final ServletContext _suspendedContext;
843 private final ServletRequest _request;
844 private final ServletResponse _response;
845 private ServletContext _dispatchContext;
846 private String _path;
847
848 public AsyncEventState(ServletContext context, ServletRequest request, ServletResponse response)
849 {
850 _suspendedContext=context;
851 _request=request;
852 _response=response;
853 }
854
855 public ServletContext getSuspendedContext()
856 {
857 return _suspendedContext;
858 }
859
860 public ServletContext getDispatchContext()
861 {
862 return _dispatchContext;
863 }
864
865 public ServletContext getServletContext()
866 {
867 return _dispatchContext==null?_suspendedContext:_dispatchContext;
868 }
869
870 public ServletRequest getRequest()
871 {
872 return _request;
873 }
874
875 public ServletResponse getResponse()
876 {
877 return _response;
878 }
879
880 public String getPath()
881 {
882 return _path;
883 }
884
885 @Override
886 public void expired()
887 {
888 AsyncContinuation.this.expired();
889 }
890
891 public void run()
892 {
893 AsyncContinuation.this.expired();
894 }
895 }
896 }