1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.server;
15
16 import java.util.ArrayList;
17 import java.util.List;
18
19 import javax.servlet.ServletContext;
20 import javax.servlet.ServletRequest;
21 import javax.servlet.ServletResponse;
22
23 import org.eclipse.jetty.continuation.Continuation;
24 import org.eclipse.jetty.continuation.ContinuationListener;
25 import org.eclipse.jetty.continuation.ContinuationThrowable;
26 import org.eclipse.jetty.io.AsyncEndPoint;
27 import org.eclipse.jetty.io.EndPoint;
28 import org.eclipse.jetty.server.handler.ContextHandler;
29 import org.eclipse.jetty.server.handler.ContextHandler.Context;
30 import org.eclipse.jetty.util.log.Log;
31 import org.eclipse.jetty.util.log.Logger;
32 import org.eclipse.jetty.util.thread.Timeout;
33
34
35
36
37
38 public class AsyncContinuation implements AsyncContext, Continuation
39 {
40 private static final Logger LOG = Log.getLogger(AsyncContinuation.class);
41
42 private final static long DEFAULT_TIMEOUT=30000L;
43
44 private final static ContinuationThrowable __exception = new ContinuationThrowable();
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59 private static final int __IDLE=0;
60 private static final int __DISPATCHED=1;
61 private static final int __ASYNCSTARTED=2;
62 private static final int __REDISPATCHING=3;
63 private static final int __ASYNCWAIT=4;
64 private static final int __REDISPATCH=5;
65 private static final int __REDISPATCHED=6;
66 private static final int __COMPLETING=7;
67 private static final int __UNCOMPLETED=8;
68 private static final int __COMPLETED=9;
69
70
71
72 protected AbstractHttpConnection _connection;
73 private List<ContinuationListener> _continuationListeners;
74
75
76 private int _state;
77 private boolean _initial;
78 private boolean _resumed;
79 private boolean _expired;
80 private volatile boolean _responseWrapped;
81 private long _timeoutMs=DEFAULT_TIMEOUT;
82 private AsyncEventState _event;
83 private volatile long _expireAt;
84
85
86 protected AsyncContinuation()
87 {
88 _state=__IDLE;
89 _initial=true;
90 }
91
92
93 protected void setConnection(final AbstractHttpConnection connection)
94 {
95 synchronized(this)
96 {
97 _connection=connection;
98 }
99 }
100
101
102 public void addContinuationListener(ContinuationListener listener)
103 {
104 synchronized(this)
105 {
106 if (_continuationListeners==null)
107 _continuationListeners=new ArrayList<ContinuationListener>();
108 _continuationListeners.add(listener);
109 }
110 }
111
112
113 public void setTimeout(long ms)
114 {
115 synchronized(this)
116 {
117 _timeoutMs=ms;
118 }
119 }
120
121
122 public long getTimeout()
123 {
124 synchronized(this)
125 {
126 return _timeoutMs;
127 }
128 }
129
130
131 public AsyncEventState getAsyncEventState()
132 {
133 synchronized(this)
134 {
135 return _event;
136 }
137 }
138
139
140
141
142
143
144
145
146
147
148 public boolean isResponseWrapped()
149 {
150 return _responseWrapped;
151 }
152
153
154
155
156
157 public boolean isInitial()
158 {
159 synchronized(this)
160 {
161 return _initial;
162 }
163 }
164
165
166
167
168
169 public boolean isSuspended()
170 {
171 synchronized(this)
172 {
173 switch(_state)
174 {
175 case __ASYNCSTARTED:
176 case __REDISPATCHING:
177 case __COMPLETING:
178 case __ASYNCWAIT:
179 return true;
180
181 default:
182 return false;
183 }
184 }
185 }
186
187
188
189
190
191 public boolean isSuspending()
192 {
193 synchronized(this)
194 {
195 switch(_state)
196 {
197 case __ASYNCSTARTED:
198 case __ASYNCWAIT:
199 return true;
200
201 default:
202 return false;
203 }
204 }
205 }
206
207
208 @Override
209 public String toString()
210 {
211 synchronized (this)
212 {
213 return super.toString()+"@"+getStatusString();
214 }
215 }
216
217
218 public String getStatusString()
219 {
220 synchronized (this)
221 {
222 return
223 ((_state==__IDLE)?"IDLE":
224 (_state==__DISPATCHED)?"DISPATCHED":
225 (_state==__ASYNCSTARTED)?"ASYNCSTARTED":
226 (_state==__ASYNCWAIT)?"ASYNCWAIT":
227 (_state==__REDISPATCHING)?"REDISPATCHING":
228 (_state==__REDISPATCH)?"REDISPATCH":
229 (_state==__REDISPATCHED)?"REDISPATCHED":
230 (_state==__COMPLETING)?"COMPLETING":
231 (_state==__UNCOMPLETED)?"UNCOMPLETED":
232 (_state==__COMPLETED)?"COMPLETE":
233 ("UNKNOWN?"+_state))+
234 (_initial?",initial":"")+
235 (_resumed?",resumed":"")+
236 (_expired?",expired":"");
237 }
238 }
239
240
241
242
243
244 protected boolean handling()
245 {
246 synchronized (this)
247 {
248 _responseWrapped=false;
249
250 switch(_state)
251 {
252 case __IDLE:
253 _initial=true;
254 _state=__DISPATCHED;
255 return true;
256
257 case __COMPLETING:
258 _state=__UNCOMPLETED;
259 return false;
260
261 case __ASYNCWAIT:
262 return false;
263
264 case __REDISPATCH:
265 _state=__REDISPATCHED;
266 return true;
267
268 default:
269 throw new IllegalStateException(this.getStatusString());
270 }
271 }
272 }
273
274
275
276
277
278 protected void suspend(final ServletContext context,
279 final ServletRequest request,
280 final ServletResponse response)
281 {
282 synchronized (this)
283 {
284 switch(_state)
285 {
286 case __DISPATCHED:
287 case __REDISPATCHED:
288 _resumed=false;
289 _expired=false;
290
291 if (_event==null || request!=_event.getRequest() || response != _event.getResponse() || context != _event.getServletContext())
292 _event=new AsyncEventState(context,request,response);
293 else
294 {
295 _event._dispatchContext=null;
296 _event._path=null;
297 }
298
299 _state=__ASYNCSTARTED;
300 break;
301
302 default:
303 throw new IllegalStateException(this.getStatusString());
304 }
305 }
306
307 }
308
309
310
311
312
313
314
315
316
317 protected boolean unhandle()
318 {
319 synchronized (this)
320 {
321 List<ContinuationListener> listeners=_continuationListeners;
322
323 switch(_state)
324 {
325 case __REDISPATCHED:
326 case __DISPATCHED:
327 _state=__UNCOMPLETED;
328 return true;
329
330 case __IDLE:
331 throw new IllegalStateException(this.getStatusString());
332
333 case __ASYNCSTARTED:
334 _initial=false;
335 _state=__ASYNCWAIT;
336 scheduleTimeout();
337 if (_state==__ASYNCWAIT)
338 return true;
339 else if (_state==__COMPLETING)
340 {
341 _state=__UNCOMPLETED;
342 return true;
343 }
344 _initial=false;
345 _state=__REDISPATCHED;
346 return false;
347
348 case __REDISPATCHING:
349 _initial=false;
350 _state=__REDISPATCHED;
351 return false;
352
353 case __COMPLETING:
354 _initial=false;
355 _state=__UNCOMPLETED;
356 return true;
357
358 default:
359 throw new IllegalStateException(this.getStatusString());
360 }
361 }
362 }
363
364
365 public void dispatch()
366 {
367 boolean dispatch=false;
368 synchronized (this)
369 {
370 switch(_state)
371 {
372 case __ASYNCSTARTED:
373 _state=__REDISPATCHING;
374 _resumed=true;
375 return;
376
377 case __ASYNCWAIT:
378 dispatch=!_expired;
379 _state=__REDISPATCH;
380 _resumed=true;
381 break;
382
383 case __REDISPATCH:
384 return;
385
386 default:
387 throw new IllegalStateException(this.getStatusString());
388 }
389 }
390
391 if (dispatch)
392 {
393 cancelTimeout();
394 scheduleDispatch();
395 }
396 }
397
398
399 protected void expired()
400 {
401 final List<ContinuationListener> listeners;
402 synchronized (this)
403 {
404 switch(_state)
405 {
406 case __ASYNCSTARTED:
407 case __ASYNCWAIT:
408 listeners=_continuationListeners;
409 break;
410 default:
411 listeners=null;
412 return;
413 }
414 _expired=true;
415 }
416
417 if (listeners!=null)
418 {
419 for (int i=0;i<listeners.size();i++)
420 {
421 ContinuationListener listener=listeners.get(i);
422 try
423 {
424 listener.onTimeout(this);
425 }
426 catch(Exception e)
427 {
428 LOG.warn(e);
429 }
430 }
431 }
432
433 synchronized (this)
434 {
435 switch(_state)
436 {
437 case __ASYNCSTARTED:
438 case __ASYNCWAIT:
439 dispatch();
440 }
441 }
442
443 scheduleDispatch();
444 }
445
446
447
448
449
450 public void complete()
451 {
452
453 boolean dispatch=false;
454 synchronized (this)
455 {
456 switch(_state)
457 {
458 case __DISPATCHED:
459 case __REDISPATCHED:
460 throw new IllegalStateException(this.getStatusString());
461
462 case __ASYNCSTARTED:
463 _state=__COMPLETING;
464 return;
465
466 case __ASYNCWAIT:
467 _state=__COMPLETING;
468 dispatch=!_expired;
469 break;
470
471 default:
472 throw new IllegalStateException(this.getStatusString());
473 }
474 }
475
476 if (dispatch)
477 {
478 cancelTimeout();
479 scheduleDispatch();
480 }
481 }
482
483
484
485
486
487
488 protected void doComplete()
489 {
490 final List<ContinuationListener> listeners;
491 synchronized (this)
492 {
493 switch(_state)
494 {
495 case __UNCOMPLETED:
496 _state=__COMPLETED;
497 listeners=_continuationListeners;
498 break;
499
500 default:
501 listeners=null;
502 throw new IllegalStateException(this.getStatusString());
503 }
504 }
505
506 if (listeners!=null)
507 {
508 for(int i=0;i<listeners.size();i++)
509 {
510 try
511 {
512 listeners.get(i).onComplete(this);
513 }
514 catch(Exception e)
515 {
516 LOG.warn(e);
517 }
518 }
519 }
520 }
521
522
523 protected void recycle()
524 {
525 synchronized (this)
526 {
527
528 switch(_state)
529 {
530 case __DISPATCHED:
531 case __REDISPATCHED:
532 throw new IllegalStateException(getStatusString());
533 default:
534 _state=__IDLE;
535 }
536 _initial = true;
537 _resumed=false;
538 _expired=false;
539 _responseWrapped=false;
540 cancelTimeout();
541 _timeoutMs=DEFAULT_TIMEOUT;
542 _continuationListeners=null;
543 }
544 }
545
546
547 public void cancel()
548 {
549 synchronized (this)
550 {
551 cancelTimeout();
552 _continuationListeners=null;
553 }
554 }
555
556
557 protected void scheduleDispatch()
558 {
559 EndPoint endp=_connection.getEndPoint();
560 if (!endp.isBlocking())
561 {
562 ((AsyncEndPoint)endp).asyncDispatch();
563 }
564 }
565
566
567 protected void scheduleTimeout()
568 {
569 EndPoint endp=_connection.getEndPoint();
570 if (_timeoutMs>0)
571 {
572 if (endp.isBlocking())
573 {
574 synchronized(this)
575 {
576 _expireAt = System.currentTimeMillis()+_timeoutMs;
577 long wait=_timeoutMs;
578 while (_expireAt>0 && wait>0 && _connection.getServer().isRunning())
579 {
580 try
581 {
582 this.wait(wait);
583 }
584 catch (InterruptedException e)
585 {
586 LOG.ignore(e);
587 }
588 wait=_expireAt-System.currentTimeMillis();
589 }
590
591 if (_expireAt>0 && wait<=0 && _connection.getServer().isRunning())
592 {
593 expired();
594 }
595 }
596 }
597 else
598 {
599 ((AsyncEndPoint)endp).scheduleTimeout(_event,_timeoutMs);
600 }
601 }
602 }
603
604
605 protected void cancelTimeout()
606 {
607 EndPoint endp=_connection.getEndPoint();
608 if (endp.isBlocking())
609 {
610 synchronized(this)
611 {
612 _expireAt=0;
613 this.notifyAll();
614 }
615 }
616 else
617 {
618 final AsyncEventState event=_event;
619 if (event!=null)
620 {
621 ((AsyncEndPoint)endp).cancelTimeout(event);
622 }
623 }
624 }
625
626
627 public boolean isCompleting()
628 {
629 synchronized (this)
630 {
631 return _state==__COMPLETING;
632 }
633 }
634
635
636 boolean isUncompleted()
637 {
638 synchronized (this)
639 {
640 return _state==__UNCOMPLETED;
641 }
642 }
643
644
645 public boolean isComplete()
646 {
647 synchronized (this)
648 {
649 return _state==__COMPLETED;
650 }
651 }
652
653
654
655 public boolean isAsyncStarted()
656 {
657 synchronized (this)
658 {
659 switch(_state)
660 {
661 case __ASYNCSTARTED:
662 case __REDISPATCHING:
663 case __REDISPATCH:
664 case __ASYNCWAIT:
665 return true;
666
667 default:
668 return false;
669 }
670 }
671 }
672
673
674
675 public boolean isAsync()
676 {
677 synchronized (this)
678 {
679 switch(_state)
680 {
681 case __IDLE:
682 case __DISPATCHED:
683 case __UNCOMPLETED:
684 case __COMPLETED:
685 return false;
686
687 default:
688 return true;
689 }
690 }
691 }
692
693
694 public void dispatch(ServletContext context, String path)
695 {
696 _event._dispatchContext=context;
697 _event._path=path;
698 dispatch();
699 }
700
701
702 public void dispatch(String path)
703 {
704 _event._path=path;
705 dispatch();
706 }
707
708
709 public Request getBaseRequest()
710 {
711 return _connection.getRequest();
712 }
713
714
715 public ServletRequest getRequest()
716 {
717 if (_event!=null)
718 return _event.getRequest();
719 return _connection.getRequest();
720 }
721
722
723 public ServletResponse getResponse()
724 {
725 if (_event!=null)
726 return _event.getResponse();
727 return _connection.getResponse();
728 }
729
730
731 public void start(final Runnable run)
732 {
733 final AsyncEventState event=_event;
734 if (event!=null)
735 {
736 _connection.getServer().getThreadPool().dispatch(new Runnable()
737 {
738 public void run()
739 {
740 ((Context)event.getServletContext()).getContextHandler().handle(run);
741 }
742 });
743 }
744 }
745
746
747 public boolean hasOriginalRequestAndResponse()
748 {
749 synchronized (this)
750 {
751 return (_event!=null && _event.getRequest()==_connection._request && _event.getResponse()==_connection._response);
752 }
753 }
754
755
756 public ContextHandler getContextHandler()
757 {
758 final AsyncEventState event=_event;
759 if (event!=null)
760 return ((Context)event.getServletContext()).getContextHandler();
761 return null;
762 }
763
764
765
766
767
768
769 public boolean isResumed()
770 {
771 synchronized (this)
772 {
773 return _resumed;
774 }
775 }
776
777
778
779
780 public boolean isExpired()
781 {
782 synchronized (this)
783 {
784 return _expired;
785 }
786 }
787
788
789
790
791
792 public void resume()
793 {
794 dispatch();
795 }
796
797
798
799
800
801 public void suspend(ServletResponse response)
802 {
803 _responseWrapped=!(response instanceof Response);
804 AsyncContinuation.this.suspend(_connection.getRequest().getServletContext(),_connection.getRequest(),response);
805 }
806
807
808
809
810
811 public void suspend()
812 {
813 _responseWrapped=false;
814 AsyncContinuation.this.suspend(_connection.getRequest().getServletContext(),_connection.getRequest(),_connection.getResponse());
815 }
816
817
818
819
820
821 public ServletResponse getServletResponse()
822 {
823 if (_responseWrapped && _event!=null && _event.getResponse()!=null)
824 return _event.getResponse();
825 return _connection.getResponse();
826 }
827
828
829
830
831
832 public Object getAttribute(String name)
833 {
834 return _connection.getRequest().getAttribute(name);
835 }
836
837
838
839
840
841 public void removeAttribute(String name)
842 {
843 _connection.getRequest().removeAttribute(name);
844 }
845
846
847
848
849
850 public void setAttribute(String name, Object attribute)
851 {
852 _connection.getRequest().setAttribute(name,attribute);
853 }
854
855
856
857
858
859 public void undispatch()
860 {
861 if (isSuspended())
862 {
863 if (LOG.isDebugEnabled())
864 throw new ContinuationThrowable();
865 else
866 throw __exception;
867 }
868 throw new IllegalStateException("!suspended");
869 }
870
871
872
873 public class AsyncEventState extends Timeout.Task implements Runnable
874 {
875 private final ServletContext _suspendedContext;
876 private final ServletRequest _request;
877 private final ServletResponse _response;
878 private ServletContext _dispatchContext;
879 private String _path;
880
881 public AsyncEventState(ServletContext context, ServletRequest request, ServletResponse response)
882 {
883 _suspendedContext=context;
884 _request=request;
885 _response=response;
886 }
887
888 public ServletContext getSuspendedContext()
889 {
890 return _suspendedContext;
891 }
892
893 public ServletContext getDispatchContext()
894 {
895 return _dispatchContext;
896 }
897
898 public ServletContext getServletContext()
899 {
900 return _dispatchContext==null?_suspendedContext:_dispatchContext;
901 }
902
903 public ServletRequest getRequest()
904 {
905 return _request;
906 }
907
908 public ServletResponse getResponse()
909 {
910 return _response;
911 }
912
913 public String getPath()
914 {
915 return _path;
916 }
917
918 @Override
919 public void expired()
920 {
921 AsyncContinuation.this.expired();
922 }
923
924 public void run()
925 {
926 AsyncContinuation.this.expired();
927 }
928 }
929 }