1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.server;
15
16 import java.util.ArrayList;
17 import java.util.List;
18
19 import javax.servlet.ServletContext;
20 import javax.servlet.ServletRequest;
21 import javax.servlet.ServletResponse;
22
23 import org.eclipse.jetty.continuation.Continuation;
24 import org.eclipse.jetty.continuation.ContinuationListener;
25 import org.eclipse.jetty.continuation.ContinuationThrowable;
26 import org.eclipse.jetty.io.AsyncEndPoint;
27 import org.eclipse.jetty.io.EndPoint;
28 import org.eclipse.jetty.server.handler.ContextHandler;
29 import org.eclipse.jetty.server.handler.ContextHandler.Context;
30 import org.eclipse.jetty.util.log.Log;
31 import org.eclipse.jetty.util.log.Logger;
32 import org.eclipse.jetty.util.thread.Timeout;
33
34
35
36
37
38 public class AsyncContinuation implements AsyncContext, Continuation
39 {
40 private static final Logger LOG = Log.getLogger(AsyncContinuation.class);
41
42 private final static long DEFAULT_TIMEOUT=30000L;
43
44 private final static ContinuationThrowable __exception = new ContinuationThrowable();
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59 private static final int __IDLE=0;
60 private static final int __DISPATCHED=1;
61 private static final int __ASYNCSTARTED=2;
62 private static final int __REDISPATCHING=3;
63 private static final int __ASYNCWAIT=4;
64 private static final int __REDISPATCH=5;
65 private static final int __REDISPATCHED=6;
66 private static final int __COMPLETING=7;
67 private static final int __UNCOMPLETED=8;
68 private static final int __COMPLETED=9;
69
70
71
72 protected AbstractHttpConnection _connection;
73 private List<ContinuationListener> _continuationListeners;
74
75
76 private int _state;
77 private boolean _initial;
78 private boolean _resumed;
79 private boolean _expired;
80 private volatile boolean _responseWrapped;
81 private long _timeoutMs=DEFAULT_TIMEOUT;
82 private AsyncEventState _event;
83 private volatile long _expireAt;
84
85
86 protected AsyncContinuation()
87 {
88 _state=__IDLE;
89 _initial=true;
90 }
91
92
93 protected void setConnection(final AbstractHttpConnection connection)
94 {
95 synchronized(this)
96 {
97 _connection=connection;
98 }
99 }
100
101
102 public void addContinuationListener(ContinuationListener listener)
103 {
104 synchronized(this)
105 {
106 if (_continuationListeners==null)
107 _continuationListeners=new ArrayList<ContinuationListener>();
108 _continuationListeners.add(listener);
109 }
110 }
111
112
113 public void setTimeout(long ms)
114 {
115 synchronized(this)
116 {
117 _timeoutMs=ms;
118 }
119 }
120
121
122 public long getTimeout()
123 {
124 synchronized(this)
125 {
126 return _timeoutMs;
127 }
128 }
129
130
131 public AsyncEventState getAsyncEventState()
132 {
133 synchronized(this)
134 {
135 return _event;
136 }
137 }
138
139
140
141
142
143
144
145
146
147
148 public boolean isResponseWrapped()
149 {
150 return _responseWrapped;
151 }
152
153
154
155
156
157 public boolean isInitial()
158 {
159 synchronized(this)
160 {
161 return _initial;
162 }
163 }
164
165
166
167
168
169 public boolean isSuspended()
170 {
171 synchronized(this)
172 {
173 switch(_state)
174 {
175 case __ASYNCSTARTED:
176 case __REDISPATCHING:
177 case __COMPLETING:
178 case __ASYNCWAIT:
179 return true;
180
181 default:
182 return false;
183 }
184 }
185 }
186
187
188
189
190
191 public boolean isSuspending()
192 {
193 synchronized(this)
194 {
195 switch(_state)
196 {
197 case __ASYNCSTARTED:
198 case __ASYNCWAIT:
199 return true;
200
201 default:
202 return false;
203 }
204 }
205 }
206
207
208 public boolean isDispatchable()
209 {
210 synchronized(this)
211 {
212 switch(_state)
213 {
214 case __REDISPATCH:
215 case __REDISPATCHED:
216 case __REDISPATCHING:
217 case __COMPLETING:
218 return true;
219
220 default:
221 return false;
222 }
223 }
224 }
225
226
227 @Override
228 public String toString()
229 {
230 synchronized (this)
231 {
232 return super.toString()+"@"+getStatusString();
233 }
234 }
235
236
237 public String getStatusString()
238 {
239 synchronized (this)
240 {
241 return
242 ((_state==__IDLE)?"IDLE":
243 (_state==__DISPATCHED)?"DISPATCHED":
244 (_state==__ASYNCSTARTED)?"ASYNCSTARTED":
245 (_state==__ASYNCWAIT)?"ASYNCWAIT":
246 (_state==__REDISPATCHING)?"REDISPATCHING":
247 (_state==__REDISPATCH)?"REDISPATCH":
248 (_state==__REDISPATCHED)?"REDISPATCHED":
249 (_state==__COMPLETING)?"COMPLETING":
250 (_state==__UNCOMPLETED)?"UNCOMPLETED":
251 (_state==__COMPLETED)?"COMPLETE":
252 ("UNKNOWN?"+_state))+
253 (_initial?",initial":"")+
254 (_resumed?",resumed":"")+
255 (_expired?",expired":"");
256 }
257 }
258
259
260
261
262
263 protected boolean handling()
264 {
265 synchronized (this)
266 {
267 _responseWrapped=false;
268
269 switch(_state)
270 {
271 case __IDLE:
272 _initial=true;
273 _state=__DISPATCHED;
274 return true;
275
276 case __COMPLETING:
277 _state=__UNCOMPLETED;
278 return false;
279
280 case __ASYNCWAIT:
281 return false;
282
283 case __REDISPATCH:
284 _state=__REDISPATCHED;
285 return true;
286
287 default:
288 throw new IllegalStateException(this.getStatusString());
289 }
290 }
291 }
292
293
294
295
296
297 protected void suspend(final ServletContext context,
298 final ServletRequest request,
299 final ServletResponse response)
300 {
301 synchronized (this)
302 {
303 switch(_state)
304 {
305 case __DISPATCHED:
306 case __REDISPATCHED:
307 _resumed=false;
308 _expired=false;
309
310 if (_event==null || request!=_event.getRequest() || response != _event.getResponse() || context != _event.getServletContext())
311 _event=new AsyncEventState(context,request,response);
312 else
313 {
314 _event._dispatchContext=null;
315 _event._path=null;
316 }
317
318 _state=__ASYNCSTARTED;
319 break;
320
321 default:
322 throw new IllegalStateException(this.getStatusString());
323 }
324 }
325
326 }
327
328
329
330
331
332
333
334
335
336 protected boolean unhandle()
337 {
338 synchronized (this)
339 {
340 List<ContinuationListener> listeners=_continuationListeners;
341
342 switch(_state)
343 {
344 case __REDISPATCHED:
345 case __DISPATCHED:
346 _state=__UNCOMPLETED;
347 return true;
348
349 case __IDLE:
350 throw new IllegalStateException(this.getStatusString());
351
352 case __ASYNCSTARTED:
353 _initial=false;
354 _state=__ASYNCWAIT;
355 scheduleTimeout();
356 if (_state==__ASYNCWAIT)
357 return true;
358 else if (_state==__COMPLETING)
359 {
360 _state=__UNCOMPLETED;
361 return true;
362 }
363 _initial=false;
364 _state=__REDISPATCHED;
365 return false;
366
367 case __REDISPATCHING:
368 _initial=false;
369 _state=__REDISPATCHED;
370 return false;
371
372 case __COMPLETING:
373 _initial=false;
374 _state=__UNCOMPLETED;
375 return true;
376
377 default:
378 throw new IllegalStateException(this.getStatusString());
379 }
380 }
381 }
382
383
384 public void dispatch()
385 {
386 boolean dispatch=false;
387 synchronized (this)
388 {
389 switch(_state)
390 {
391 case __ASYNCSTARTED:
392 _state=__REDISPATCHING;
393 _resumed=true;
394 return;
395
396 case __ASYNCWAIT:
397 dispatch=!_expired;
398 _state=__REDISPATCH;
399 _resumed=true;
400 break;
401
402 case __REDISPATCH:
403 return;
404
405 default:
406 throw new IllegalStateException(this.getStatusString());
407 }
408 }
409
410 if (dispatch)
411 {
412 cancelTimeout();
413 scheduleDispatch();
414 }
415 }
416
417
418 protected void expired()
419 {
420 final List<ContinuationListener> listeners;
421 synchronized (this)
422 {
423 switch(_state)
424 {
425 case __ASYNCSTARTED:
426 case __ASYNCWAIT:
427 listeners=_continuationListeners;
428 break;
429 default:
430 listeners=null;
431 return;
432 }
433 _expired=true;
434 }
435
436 if (listeners!=null)
437 {
438 for (int i=0;i<listeners.size();i++)
439 {
440 ContinuationListener listener=listeners.get(i);
441 try
442 {
443 listener.onTimeout(this);
444 }
445 catch(Exception e)
446 {
447 LOG.warn(e);
448 }
449 }
450 }
451
452 synchronized (this)
453 {
454 switch(_state)
455 {
456 case __ASYNCSTARTED:
457 case __ASYNCWAIT:
458 dispatch();
459 }
460 }
461
462 scheduleDispatch();
463 }
464
465
466
467
468
469 public void complete()
470 {
471
472 boolean dispatch=false;
473 synchronized (this)
474 {
475 switch(_state)
476 {
477 case __DISPATCHED:
478 case __REDISPATCHED:
479 throw new IllegalStateException(this.getStatusString());
480
481 case __ASYNCSTARTED:
482 _state=__COMPLETING;
483 return;
484
485 case __ASYNCWAIT:
486 _state=__COMPLETING;
487 dispatch=!_expired;
488 break;
489
490 default:
491 throw new IllegalStateException(this.getStatusString());
492 }
493 }
494
495 if (dispatch)
496 {
497 cancelTimeout();
498 scheduleDispatch();
499 }
500 }
501
502
503
504
505
506
507 protected void doComplete()
508 {
509 final List<ContinuationListener> listeners;
510 synchronized (this)
511 {
512 switch(_state)
513 {
514 case __UNCOMPLETED:
515 _state=__COMPLETED;
516 listeners=_continuationListeners;
517 break;
518
519 default:
520 listeners=null;
521 throw new IllegalStateException(this.getStatusString());
522 }
523 }
524
525 if (listeners!=null)
526 {
527 for(int i=0;i<listeners.size();i++)
528 {
529 try
530 {
531 listeners.get(i).onComplete(this);
532 }
533 catch(Exception e)
534 {
535 LOG.warn(e);
536 }
537 }
538 }
539 }
540
541
542 protected void recycle()
543 {
544 synchronized (this)
545 {
546
547 switch(_state)
548 {
549 case __DISPATCHED:
550 case __REDISPATCHED:
551 throw new IllegalStateException(getStatusString());
552 default:
553 _state=__IDLE;
554 }
555 _initial = true;
556 _resumed=false;
557 _expired=false;
558 _responseWrapped=false;
559 cancelTimeout();
560 _timeoutMs=DEFAULT_TIMEOUT;
561 _continuationListeners=null;
562 }
563 }
564
565
566 public void cancel()
567 {
568 synchronized (this)
569 {
570 cancelTimeout();
571 _continuationListeners=null;
572 }
573 }
574
575
576 protected void scheduleDispatch()
577 {
578 EndPoint endp=_connection.getEndPoint();
579 if (!endp.isBlocking())
580 {
581 ((AsyncEndPoint)endp).asyncDispatch();
582 }
583 }
584
585
586 protected void scheduleTimeout()
587 {
588 EndPoint endp=_connection.getEndPoint();
589 if (_timeoutMs>0)
590 {
591 if (endp.isBlocking())
592 {
593 synchronized(this)
594 {
595 _expireAt = System.currentTimeMillis()+_timeoutMs;
596 long wait=_timeoutMs;
597 while (_expireAt>0 && wait>0 && _connection.getServer().isRunning())
598 {
599 try
600 {
601 this.wait(wait);
602 }
603 catch (InterruptedException e)
604 {
605 LOG.ignore(e);
606 }
607 wait=_expireAt-System.currentTimeMillis();
608 }
609
610 if (_expireAt>0 && wait<=0 && _connection.getServer().isRunning())
611 {
612 expired();
613 }
614 }
615 }
616 else
617 {
618 ((AsyncEndPoint)endp).scheduleTimeout(_event,_timeoutMs);
619 }
620 }
621 }
622
623
624 protected void cancelTimeout()
625 {
626 EndPoint endp=_connection.getEndPoint();
627 if (endp.isBlocking())
628 {
629 synchronized(this)
630 {
631 _expireAt=0;
632 this.notifyAll();
633 }
634 }
635 else
636 {
637 final AsyncEventState event=_event;
638 if (event!=null)
639 {
640 ((AsyncEndPoint)endp).cancelTimeout(event);
641 }
642 }
643 }
644
645
646 public boolean isCompleting()
647 {
648 synchronized (this)
649 {
650 return _state==__COMPLETING;
651 }
652 }
653
654
655 boolean isUncompleted()
656 {
657 synchronized (this)
658 {
659 return _state==__UNCOMPLETED;
660 }
661 }
662
663
664 public boolean isComplete()
665 {
666 synchronized (this)
667 {
668 return _state==__COMPLETED;
669 }
670 }
671
672
673
674 public boolean isAsyncStarted()
675 {
676 synchronized (this)
677 {
678 switch(_state)
679 {
680 case __ASYNCSTARTED:
681 case __REDISPATCHING:
682 case __REDISPATCH:
683 case __ASYNCWAIT:
684 return true;
685
686 default:
687 return false;
688 }
689 }
690 }
691
692
693
694 public boolean isAsync()
695 {
696 synchronized (this)
697 {
698 switch(_state)
699 {
700 case __IDLE:
701 case __DISPATCHED:
702 case __UNCOMPLETED:
703 case __COMPLETED:
704 return false;
705
706 default:
707 return true;
708 }
709 }
710 }
711
712
713 public void dispatch(ServletContext context, String path)
714 {
715 _event._dispatchContext=context;
716 _event._path=path;
717 dispatch();
718 }
719
720
721 public void dispatch(String path)
722 {
723 _event._path=path;
724 dispatch();
725 }
726
727
728 public Request getBaseRequest()
729 {
730 return _connection.getRequest();
731 }
732
733
734 public ServletRequest getRequest()
735 {
736 if (_event!=null)
737 return _event.getRequest();
738 return _connection.getRequest();
739 }
740
741
742 public ServletResponse getResponse()
743 {
744 if (_event!=null)
745 return _event.getResponse();
746 return _connection.getResponse();
747 }
748
749
750 public void start(final Runnable run)
751 {
752 final AsyncEventState event=_event;
753 if (event!=null)
754 {
755 _connection.getServer().getThreadPool().dispatch(new Runnable()
756 {
757 public void run()
758 {
759 ((Context)event.getServletContext()).getContextHandler().handle(run);
760 }
761 });
762 }
763 }
764
765
766 public boolean hasOriginalRequestAndResponse()
767 {
768 synchronized (this)
769 {
770 return (_event!=null && _event.getRequest()==_connection._request && _event.getResponse()==_connection._response);
771 }
772 }
773
774
775 public ContextHandler getContextHandler()
776 {
777 final AsyncEventState event=_event;
778 if (event!=null)
779 return ((Context)event.getServletContext()).getContextHandler();
780 return null;
781 }
782
783
784
785
786
787
788 public boolean isResumed()
789 {
790 synchronized (this)
791 {
792 return _resumed;
793 }
794 }
795
796
797
798
799 public boolean isExpired()
800 {
801 synchronized (this)
802 {
803 return _expired;
804 }
805 }
806
807
808
809
810
811 public void resume()
812 {
813 dispatch();
814 }
815
816
817
818
819
820 public void suspend(ServletResponse response)
821 {
822 _responseWrapped=!(response instanceof Response);
823 AsyncContinuation.this.suspend(_connection.getRequest().getServletContext(),_connection.getRequest(),response);
824 }
825
826
827
828
829
830 public void suspend()
831 {
832 _responseWrapped=false;
833 AsyncContinuation.this.suspend(_connection.getRequest().getServletContext(),_connection.getRequest(),_connection.getResponse());
834 }
835
836
837
838
839
840 public ServletResponse getServletResponse()
841 {
842 if (_responseWrapped && _event!=null && _event.getResponse()!=null)
843 return _event.getResponse();
844 return _connection.getResponse();
845 }
846
847
848
849
850
851 public Object getAttribute(String name)
852 {
853 return _connection.getRequest().getAttribute(name);
854 }
855
856
857
858
859
860 public void removeAttribute(String name)
861 {
862 _connection.getRequest().removeAttribute(name);
863 }
864
865
866
867
868
869 public void setAttribute(String name, Object attribute)
870 {
871 _connection.getRequest().setAttribute(name,attribute);
872 }
873
874
875
876
877
878 public void undispatch()
879 {
880 if (isSuspended())
881 {
882 if (LOG.isDebugEnabled())
883 throw new ContinuationThrowable();
884 else
885 throw __exception;
886 }
887 throw new IllegalStateException("!suspended");
888 }
889
890
891
892 public class AsyncEventState extends Timeout.Task implements Runnable
893 {
894 private final ServletContext _suspendedContext;
895 private final ServletRequest _request;
896 private final ServletResponse _response;
897 private ServletContext _dispatchContext;
898 private String _path;
899
900 public AsyncEventState(ServletContext context, ServletRequest request, ServletResponse response)
901 {
902 _suspendedContext=context;
903 _request=request;
904 _response=response;
905 }
906
907 public ServletContext getSuspendedContext()
908 {
909 return _suspendedContext;
910 }
911
912 public ServletContext getDispatchContext()
913 {
914 return _dispatchContext;
915 }
916
917 public ServletContext getServletContext()
918 {
919 return _dispatchContext==null?_suspendedContext:_dispatchContext;
920 }
921
922 public ServletRequest getRequest()
923 {
924 return _request;
925 }
926
927 public ServletResponse getResponse()
928 {
929 return _response;
930 }
931
932 public String getPath()
933 {
934 return _path;
935 }
936
937 @Override
938 public void expired()
939 {
940 AsyncContinuation.this.expired();
941 }
942
943 public void run()
944 {
945 AsyncContinuation.this.expired();
946 }
947 }
948 }