1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.server;
15
16 import java.util.ArrayList;
17 import java.util.List;
18
19 import javax.servlet.ServletContext;
20 import javax.servlet.ServletRequest;
21 import javax.servlet.ServletResponse;
22
23 import org.eclipse.jetty.continuation.Continuation;
24 import org.eclipse.jetty.continuation.ContinuationListener;
25 import org.eclipse.jetty.continuation.ContinuationThrowable;
26 import org.eclipse.jetty.io.AsyncEndPoint;
27 import org.eclipse.jetty.io.EndPoint;
28 import org.eclipse.jetty.server.handler.ContextHandler;
29 import org.eclipse.jetty.server.handler.ContextHandler.Context;
30 import org.eclipse.jetty.util.log.Log;
31 import org.eclipse.jetty.util.thread.Timeout;
32
33
34
35
36
37 public class AsyncContinuation implements AsyncContext, Continuation
38 {
39 private final static long DEFAULT_TIMEOUT=30000L;
40
41 private final static ContinuationThrowable __exception = new ContinuationThrowable();
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56 private static final int __IDLE=0;
57 private static final int __DISPATCHED=1;
58 private static final int __ASYNCSTARTED=2;
59 private static final int __REDISPATCHING=3;
60 private static final int __ASYNCWAIT=4;
61 private static final int __REDISPATCH=5;
62 private static final int __REDISPATCHED=6;
63 private static final int __COMPLETING=7;
64 private static final int __UNCOMPLETED=8;
65 private static final int __COMPLETED=9;
66
67
68
69 protected HttpConnection _connection;
70 private List<ContinuationListener> _continuationListeners;
71
72
73 private int _state;
74 private boolean _initial;
75 private boolean _resumed;
76 private boolean _expired;
77 private volatile boolean _responseWrapped;
78 private long _timeoutMs=DEFAULT_TIMEOUT;
79 private AsyncEventState _event;
80 private volatile long _expireAt;
81
82
83 protected AsyncContinuation()
84 {
85 _state=__IDLE;
86 _initial=true;
87 }
88
89
90 protected void setConnection(final HttpConnection connection)
91 {
92 synchronized(this)
93 {
94 _connection=connection;
95 }
96 }
97
98
99 public void addContinuationListener(ContinuationListener listener)
100 {
101 synchronized(this)
102 {
103 if (_continuationListeners==null)
104 _continuationListeners=new ArrayList<ContinuationListener>();
105 _continuationListeners.add(listener);
106 }
107 }
108
109
110 public void setTimeout(long ms)
111 {
112 synchronized(this)
113 {
114 _timeoutMs=ms;
115 }
116 }
117
118
119 public long getTimeout()
120 {
121 synchronized(this)
122 {
123 return _timeoutMs;
124 }
125 }
126
127
128 public AsyncEventState getAsyncEventState()
129 {
130 synchronized(this)
131 {
132 return _event;
133 }
134 }
135
136
137
138
139
140
141
142
143
144
145 public boolean isResponseWrapped()
146 {
147 return _responseWrapped;
148 }
149
150
151
152
153
154 public boolean isInitial()
155 {
156 synchronized(this)
157 {
158 return _initial;
159 }
160 }
161
162
163
164
165
166 public boolean isSuspended()
167 {
168 synchronized(this)
169 {
170 switch(_state)
171 {
172 case __ASYNCSTARTED:
173 case __REDISPATCHING:
174 case __COMPLETING:
175 case __ASYNCWAIT:
176 return true;
177
178 default:
179 return false;
180 }
181 }
182 }
183
184
185 @Override
186 public String toString()
187 {
188 synchronized (this)
189 {
190 return super.toString()+"@"+getStatusString();
191 }
192 }
193
194
195 public String getStatusString()
196 {
197 synchronized (this)
198 {
199 return
200 ((_state==__IDLE)?"IDLE":
201 (_state==__DISPATCHED)?"DISPATCHED":
202 (_state==__ASYNCSTARTED)?"ASYNCSTARTED":
203 (_state==__ASYNCWAIT)?"ASYNCWAIT":
204 (_state==__REDISPATCHING)?"REDISPATCHING":
205 (_state==__REDISPATCH)?"REDISPATCH":
206 (_state==__REDISPATCHED)?"REDISPATCHED":
207 (_state==__COMPLETING)?"COMPLETING":
208 (_state==__UNCOMPLETED)?"UNCOMPLETED":
209 (_state==__COMPLETED)?"COMPLETE":
210 ("UNKNOWN?"+_state))+
211 (_initial?",initial":"")+
212 (_resumed?",resumed":"")+
213 (_expired?",expired":"");
214 }
215 }
216
217
218
219
220
221 protected boolean handling()
222 {
223 synchronized (this)
224 {
225 _responseWrapped=false;
226
227 switch(_state)
228 {
229 case __IDLE:
230 _initial=true;
231 _state=__DISPATCHED;
232 return true;
233
234 case __COMPLETING:
235 _state=__UNCOMPLETED;
236 return false;
237
238 case __ASYNCWAIT:
239 return false;
240
241 case __REDISPATCH:
242 _state=__REDISPATCHED;
243 return true;
244
245 default:
246 throw new IllegalStateException(this.getStatusString());
247 }
248 }
249 }
250
251
252
253
254
255 protected void suspend(final ServletContext context,
256 final ServletRequest request,
257 final ServletResponse response)
258 {
259 synchronized (this)
260 {
261 switch(_state)
262 {
263 case __DISPATCHED:
264 case __REDISPATCHED:
265 _resumed=false;
266 _expired=false;
267
268 if (_event==null || request!=_event.getRequest() || response != _event.getResponse() || context != _event.getServletContext())
269 _event=new AsyncEventState(context,request,response);
270 else
271 {
272 _event._dispatchContext=null;
273 _event._path=null;
274 }
275
276 _state=__ASYNCSTARTED;
277 break;
278
279 default:
280 throw new IllegalStateException(this.getStatusString());
281 }
282 }
283
284 }
285
286
287
288
289
290
291
292
293
294 protected boolean unhandle()
295 {
296 synchronized (this)
297 {
298 List<ContinuationListener> listeners=_continuationListeners;
299
300 switch(_state)
301 {
302 case __REDISPATCHED:
303 case __DISPATCHED:
304 _state=__UNCOMPLETED;
305 return true;
306
307 case __IDLE:
308 throw new IllegalStateException(this.getStatusString());
309
310 case __ASYNCSTARTED:
311 _initial=false;
312 _state=__ASYNCWAIT;
313 scheduleTimeout();
314 if (_state==__ASYNCWAIT)
315 return true;
316 else if (_state==__COMPLETING)
317 {
318 _state=__UNCOMPLETED;
319 return true;
320 }
321 _initial=false;
322 _state=__REDISPATCHED;
323 return false;
324
325 case __REDISPATCHING:
326 _initial=false;
327 _state=__REDISPATCHED;
328 return false;
329
330 case __COMPLETING:
331 _initial=false;
332 _state=__UNCOMPLETED;
333 return true;
334
335 default:
336 throw new IllegalStateException(this.getStatusString());
337 }
338 }
339 }
340
341
342 public void dispatch()
343 {
344 boolean dispatch=false;
345 synchronized (this)
346 {
347 switch(_state)
348 {
349 case __ASYNCSTARTED:
350 _state=__REDISPATCHING;
351 _resumed=true;
352 return;
353
354 case __ASYNCWAIT:
355 dispatch=!_expired;
356 _state=__REDISPATCH;
357 _resumed=true;
358 break;
359
360 case __REDISPATCH:
361 return;
362
363 default:
364 throw new IllegalStateException(this.getStatusString());
365 }
366 }
367
368 if (dispatch)
369 {
370 cancelTimeout();
371 scheduleDispatch();
372 }
373 }
374
375
376 protected void expired()
377 {
378 final List<ContinuationListener> listeners;
379 synchronized (this)
380 {
381 switch(_state)
382 {
383 case __ASYNCSTARTED:
384 case __ASYNCWAIT:
385 listeners=_continuationListeners;
386 break;
387 default:
388 listeners=null;
389 return;
390 }
391 _expired=true;
392 }
393
394 if (listeners!=null)
395 {
396 for (int i=0;i<listeners.size();i++)
397 {
398 ContinuationListener listener=listeners.get(i);
399 try
400 {
401 listener.onTimeout(this);
402 }
403 catch(Exception e)
404 {
405 Log.warn(e);
406 }
407 }
408 }
409
410 synchronized (this)
411 {
412 switch(_state)
413 {
414 case __ASYNCSTARTED:
415 case __ASYNCWAIT:
416 dispatch();
417 }
418 }
419
420 scheduleDispatch();
421 }
422
423
424
425
426
427 public void complete()
428 {
429
430 boolean dispatch=false;
431 synchronized (this)
432 {
433 switch(_state)
434 {
435 case __DISPATCHED:
436 case __REDISPATCHED:
437 throw new IllegalStateException(this.getStatusString());
438
439 case __ASYNCSTARTED:
440 _state=__COMPLETING;
441 return;
442
443 case __ASYNCWAIT:
444 _state=__COMPLETING;
445 dispatch=!_expired;
446 break;
447
448 default:
449 throw new IllegalStateException(this.getStatusString());
450 }
451 }
452
453 if (dispatch)
454 {
455 cancelTimeout();
456 scheduleDispatch();
457 }
458 }
459
460
461
462
463
464
465 protected void doComplete()
466 {
467 final List<ContinuationListener> listeners;
468 synchronized (this)
469 {
470 switch(_state)
471 {
472 case __UNCOMPLETED:
473 _state=__COMPLETED;
474 listeners=_continuationListeners;
475 break;
476
477 default:
478 listeners=null;
479 throw new IllegalStateException(this.getStatusString());
480 }
481 }
482
483 if (listeners!=null)
484 {
485 for(int i=0;i<listeners.size();i++)
486 {
487 try
488 {
489 listeners.get(i).onComplete(this);
490 }
491 catch(Exception e)
492 {
493 Log.warn(e);
494 }
495 }
496 }
497 }
498
499
500 protected void recycle()
501 {
502 synchronized (this)
503 {
504
505 switch(_state)
506 {
507 case __DISPATCHED:
508 case __REDISPATCHED:
509 throw new IllegalStateException(getStatusString());
510 default:
511 _state=__IDLE;
512 }
513 _initial = true;
514 _resumed=false;
515 _expired=false;
516 _responseWrapped=false;
517 cancelTimeout();
518 _timeoutMs=DEFAULT_TIMEOUT;
519 _continuationListeners=null;
520 }
521 }
522
523
524 public void cancel()
525 {
526 synchronized (this)
527 {
528 cancelTimeout();
529 _continuationListeners=null;
530 }
531 }
532
533
534 protected void scheduleDispatch()
535 {
536 EndPoint endp=_connection.getEndPoint();
537 if (!endp.isBlocking())
538 {
539 ((AsyncEndPoint)endp).dispatch();
540 }
541 }
542
543
544 protected void scheduleTimeout()
545 {
546 EndPoint endp=_connection.getEndPoint();
547 if (_timeoutMs>0)
548 {
549 if (endp.isBlocking())
550 {
551 synchronized(this)
552 {
553 _expireAt = System.currentTimeMillis()+_timeoutMs;
554 long wait=_timeoutMs;
555 while (_expireAt>0 && wait>0 && _connection.getServer().isRunning())
556 {
557 try
558 {
559 this.wait(wait);
560 }
561 catch (InterruptedException e)
562 {
563 Log.ignore(e);
564 }
565 wait=_expireAt-System.currentTimeMillis();
566 }
567
568 if (_expireAt>0 && wait<=0 && _connection.getServer().isRunning())
569 {
570 expired();
571 }
572 }
573 }
574 else
575 {
576 _connection.scheduleTimeout(_event,_timeoutMs);
577 }
578 }
579 }
580
581
582 protected void cancelTimeout()
583 {
584 EndPoint endp=_connection.getEndPoint();
585 if (endp.isBlocking())
586 {
587 synchronized(this)
588 {
589 _expireAt=0;
590 this.notifyAll();
591 }
592 }
593 else
594 {
595 final AsyncEventState event=_event;
596 if (event!=null)
597 _connection.cancelTimeout(event);
598 }
599 }
600
601
602 public boolean isCompleting()
603 {
604 synchronized (this)
605 {
606 return _state==__COMPLETING;
607 }
608 }
609
610
611 boolean isUncompleted()
612 {
613 synchronized (this)
614 {
615 return _state==__UNCOMPLETED;
616 }
617 }
618
619
620 public boolean isComplete()
621 {
622 synchronized (this)
623 {
624 return _state==__COMPLETED;
625 }
626 }
627
628
629
630 public boolean isAsyncStarted()
631 {
632 synchronized (this)
633 {
634 switch(_state)
635 {
636 case __ASYNCSTARTED:
637 case __REDISPATCHING:
638 case __REDISPATCH:
639 case __ASYNCWAIT:
640 return true;
641
642 default:
643 return false;
644 }
645 }
646 }
647
648
649
650 public boolean isAsync()
651 {
652 synchronized (this)
653 {
654 switch(_state)
655 {
656 case __IDLE:
657 case __DISPATCHED:
658 case __UNCOMPLETED:
659 case __COMPLETED:
660 return false;
661
662 default:
663 return true;
664 }
665 }
666 }
667
668
669 public void dispatch(ServletContext context, String path)
670 {
671 _event._dispatchContext=context;
672 _event._path=path;
673 dispatch();
674 }
675
676
677 public void dispatch(String path)
678 {
679 _event._path=path;
680 dispatch();
681 }
682
683
684 public Request getBaseRequest()
685 {
686 return _connection.getRequest();
687 }
688
689
690 public ServletRequest getRequest()
691 {
692 if (_event!=null)
693 return _event.getRequest();
694 return _connection.getRequest();
695 }
696
697
698 public ServletResponse getResponse()
699 {
700 if (_event!=null)
701 return _event.getResponse();
702 return _connection.getResponse();
703 }
704
705
706 public void start(final Runnable run)
707 {
708 final AsyncEventState event=_event;
709 if (event!=null)
710 {
711 _connection.getServer().getThreadPool().dispatch(new Runnable()
712 {
713 public void run()
714 {
715 ((Context)event.getServletContext()).getContextHandler().handle(run);
716 }
717 });
718 }
719 }
720
721
722 public boolean hasOriginalRequestAndResponse()
723 {
724 synchronized (this)
725 {
726 return (_event!=null && _event.getRequest()==_connection._request && _event.getResponse()==_connection._response);
727 }
728 }
729
730
731 public ContextHandler getContextHandler()
732 {
733 final AsyncEventState event=_event;
734 if (event!=null)
735 return ((Context)event.getServletContext()).getContextHandler();
736 return null;
737 }
738
739
740
741
742
743
744 public boolean isResumed()
745 {
746 synchronized (this)
747 {
748 return _resumed;
749 }
750 }
751
752
753
754
755 public boolean isExpired()
756 {
757 synchronized (this)
758 {
759 return _expired;
760 }
761 }
762
763
764
765
766
767 public void resume()
768 {
769 dispatch();
770 }
771
772
773
774
775
776 public void suspend(ServletResponse response)
777 {
778 _responseWrapped=!(response instanceof Response);
779 AsyncContinuation.this.suspend(_connection.getRequest().getServletContext(),_connection.getRequest(),response);
780 }
781
782
783
784
785
786 public void suspend()
787 {
788 _responseWrapped=false;
789 AsyncContinuation.this.suspend(_connection.getRequest().getServletContext(),_connection.getRequest(),_connection.getResponse());
790 }
791
792
793
794
795
796 public ServletResponse getServletResponse()
797 {
798 if (_responseWrapped && _event!=null && _event.getResponse()!=null)
799 return _event.getResponse();
800 return _connection.getResponse();
801 }
802
803
804
805
806
807 public Object getAttribute(String name)
808 {
809 return _connection.getRequest().getAttribute(name);
810 }
811
812
813
814
815
816 public void removeAttribute(String name)
817 {
818 _connection.getRequest().removeAttribute(name);
819 }
820
821
822
823
824
825 public void setAttribute(String name, Object attribute)
826 {
827 _connection.getRequest().setAttribute(name,attribute);
828 }
829
830
831
832
833
834 public void undispatch()
835 {
836 if (isSuspended())
837 {
838 if (Log.isDebugEnabled())
839 throw new ContinuationThrowable();
840 else
841 throw __exception;
842 }
843 throw new IllegalStateException("!suspended");
844 }
845
846
847
848 public class AsyncEventState extends Timeout.Task implements Runnable
849 {
850 private final ServletContext _suspendedContext;
851 private final ServletRequest _request;
852 private final ServletResponse _response;
853 private ServletContext _dispatchContext;
854 private String _path;
855
856 public AsyncEventState(ServletContext context, ServletRequest request, ServletResponse response)
857 {
858 _suspendedContext=context;
859 _request=request;
860 _response=response;
861 }
862
863 public ServletContext getSuspendedContext()
864 {
865 return _suspendedContext;
866 }
867
868 public ServletContext getDispatchContext()
869 {
870 return _dispatchContext;
871 }
872
873 public ServletContext getServletContext()
874 {
875 return _dispatchContext==null?_suspendedContext:_dispatchContext;
876 }
877
878 public ServletRequest getRequest()
879 {
880 return _request;
881 }
882
883 public ServletResponse getResponse()
884 {
885 return _response;
886 }
887
888 public String getPath()
889 {
890 return _path;
891 }
892
893 @Override
894 public void expired()
895 {
896 AsyncContinuation.this.expired();
897 }
898
899 public void run()
900 {
901 AsyncContinuation.this.expired();
902 }
903 }
904 }