1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.server;
15
16 import java.util.ArrayList;
17 import java.util.List;
18
19 import javax.servlet.ServletContext;
20 import javax.servlet.ServletRequest;
21 import javax.servlet.ServletResponse;
22
23 import org.eclipse.jetty.continuation.Continuation;
24 import org.eclipse.jetty.continuation.ContinuationListener;
25 import org.eclipse.jetty.continuation.ContinuationThrowable;
26 import org.eclipse.jetty.io.AsyncEndPoint;
27 import org.eclipse.jetty.io.EndPoint;
28 import org.eclipse.jetty.server.handler.ContextHandler;
29 import org.eclipse.jetty.server.handler.ContextHandler.Context;
30 import org.eclipse.jetty.util.log.Log;
31 import org.eclipse.jetty.util.log.Logger;
32 import org.eclipse.jetty.util.thread.Timeout;
33
34
35
36
37
38 public class AsyncContinuation implements AsyncContext, Continuation
39 {
40 private static final Logger LOG = Log.getLogger(AsyncContinuation.class);
41
42 private final static long DEFAULT_TIMEOUT=30000L;
43
44 private final static ContinuationThrowable __exception = new ContinuationThrowable();
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59 private static final int __IDLE=0;
60 private static final int __DISPATCHED=1;
61 private static final int __ASYNCSTARTED=2;
62 private static final int __REDISPATCHING=3;
63 private static final int __ASYNCWAIT=4;
64 private static final int __REDISPATCH=5;
65 private static final int __REDISPATCHED=6;
66 private static final int __COMPLETING=7;
67 private static final int __UNCOMPLETED=8;
68 private static final int __COMPLETED=9;
69
70
71
72 protected HttpConnection _connection;
73 private List<ContinuationListener> _continuationListeners;
74
75
76 private int _state;
77 private boolean _initial;
78 private boolean _resumed;
79 private boolean _expired;
80 private volatile boolean _responseWrapped;
81 private long _timeoutMs=DEFAULT_TIMEOUT;
82 private AsyncEventState _event;
83 private volatile long _expireAt;
84
85
86 protected AsyncContinuation()
87 {
88 _state=__IDLE;
89 _initial=true;
90 }
91
92
93 protected void setConnection(final HttpConnection connection)
94 {
95 synchronized(this)
96 {
97 _connection=connection;
98 }
99 }
100
101
102 public void addContinuationListener(ContinuationListener listener)
103 {
104 synchronized(this)
105 {
106 if (_continuationListeners==null)
107 _continuationListeners=new ArrayList<ContinuationListener>();
108 _continuationListeners.add(listener);
109 }
110 }
111
112
113 public void setTimeout(long ms)
114 {
115 synchronized(this)
116 {
117 _timeoutMs=ms;
118 }
119 }
120
121
122 public long getTimeout()
123 {
124 synchronized(this)
125 {
126 return _timeoutMs;
127 }
128 }
129
130
131 public AsyncEventState getAsyncEventState()
132 {
133 synchronized(this)
134 {
135 return _event;
136 }
137 }
138
139
140
141
142
143
144
145
146
147
148 public boolean isResponseWrapped()
149 {
150 return _responseWrapped;
151 }
152
153
154
155
156
157 public boolean isInitial()
158 {
159 synchronized(this)
160 {
161 return _initial;
162 }
163 }
164
165
166
167
168
169 public boolean isSuspended()
170 {
171 synchronized(this)
172 {
173 switch(_state)
174 {
175 case __ASYNCSTARTED:
176 case __REDISPATCHING:
177 case __COMPLETING:
178 case __ASYNCWAIT:
179 return true;
180
181 default:
182 return false;
183 }
184 }
185 }
186
187
188 @Override
189 public String toString()
190 {
191 synchronized (this)
192 {
193 return super.toString()+"@"+getStatusString();
194 }
195 }
196
197
198 public String getStatusString()
199 {
200 synchronized (this)
201 {
202 return
203 ((_state==__IDLE)?"IDLE":
204 (_state==__DISPATCHED)?"DISPATCHED":
205 (_state==__ASYNCSTARTED)?"ASYNCSTARTED":
206 (_state==__ASYNCWAIT)?"ASYNCWAIT":
207 (_state==__REDISPATCHING)?"REDISPATCHING":
208 (_state==__REDISPATCH)?"REDISPATCH":
209 (_state==__REDISPATCHED)?"REDISPATCHED":
210 (_state==__COMPLETING)?"COMPLETING":
211 (_state==__UNCOMPLETED)?"UNCOMPLETED":
212 (_state==__COMPLETED)?"COMPLETE":
213 ("UNKNOWN?"+_state))+
214 (_initial?",initial":"")+
215 (_resumed?",resumed":"")+
216 (_expired?",expired":"");
217 }
218 }
219
220
221
222
223
224 protected boolean handling()
225 {
226 synchronized (this)
227 {
228 _responseWrapped=false;
229
230 switch(_state)
231 {
232 case __IDLE:
233 _initial=true;
234 _state=__DISPATCHED;
235 return true;
236
237 case __COMPLETING:
238 _state=__UNCOMPLETED;
239 return false;
240
241 case __ASYNCWAIT:
242 return false;
243
244 case __REDISPATCH:
245 _state=__REDISPATCHED;
246 return true;
247
248 default:
249 throw new IllegalStateException(this.getStatusString());
250 }
251 }
252 }
253
254
255
256
257
258 protected void suspend(final ServletContext context,
259 final ServletRequest request,
260 final ServletResponse response)
261 {
262 synchronized (this)
263 {
264 switch(_state)
265 {
266 case __DISPATCHED:
267 case __REDISPATCHED:
268 _resumed=false;
269 _expired=false;
270
271 if (_event==null || request!=_event.getRequest() || response != _event.getResponse() || context != _event.getServletContext())
272 _event=new AsyncEventState(context,request,response);
273 else
274 {
275 _event._dispatchContext=null;
276 _event._path=null;
277 }
278
279 _state=__ASYNCSTARTED;
280 break;
281
282 default:
283 throw new IllegalStateException(this.getStatusString());
284 }
285 }
286
287 }
288
289
290
291
292
293
294
295
296
297 protected boolean unhandle()
298 {
299 synchronized (this)
300 {
301 List<ContinuationListener> listeners=_continuationListeners;
302
303 switch(_state)
304 {
305 case __REDISPATCHED:
306 case __DISPATCHED:
307 _state=__UNCOMPLETED;
308 return true;
309
310 case __IDLE:
311 throw new IllegalStateException(this.getStatusString());
312
313 case __ASYNCSTARTED:
314 _initial=false;
315 _state=__ASYNCWAIT;
316 scheduleTimeout();
317 if (_state==__ASYNCWAIT)
318 return true;
319 else if (_state==__COMPLETING)
320 {
321 _state=__UNCOMPLETED;
322 return true;
323 }
324 _initial=false;
325 _state=__REDISPATCHED;
326 return false;
327
328 case __REDISPATCHING:
329 _initial=false;
330 _state=__REDISPATCHED;
331 return false;
332
333 case __COMPLETING:
334 _initial=false;
335 _state=__UNCOMPLETED;
336 return true;
337
338 default:
339 throw new IllegalStateException(this.getStatusString());
340 }
341 }
342 }
343
344
345 public void dispatch()
346 {
347 boolean dispatch=false;
348 synchronized (this)
349 {
350 switch(_state)
351 {
352 case __ASYNCSTARTED:
353 _state=__REDISPATCHING;
354 _resumed=true;
355 return;
356
357 case __ASYNCWAIT:
358 dispatch=!_expired;
359 _state=__REDISPATCH;
360 _resumed=true;
361 break;
362
363 case __REDISPATCH:
364 return;
365
366 default:
367 throw new IllegalStateException(this.getStatusString());
368 }
369 }
370
371 if (dispatch)
372 {
373 cancelTimeout();
374 scheduleDispatch();
375 }
376 }
377
378
379 protected void expired()
380 {
381 final List<ContinuationListener> listeners;
382 synchronized (this)
383 {
384 switch(_state)
385 {
386 case __ASYNCSTARTED:
387 case __ASYNCWAIT:
388 listeners=_continuationListeners;
389 break;
390 default:
391 listeners=null;
392 return;
393 }
394 _expired=true;
395 }
396
397 if (listeners!=null)
398 {
399 for (int i=0;i<listeners.size();i++)
400 {
401 ContinuationListener listener=listeners.get(i);
402 try
403 {
404 listener.onTimeout(this);
405 }
406 catch(Exception e)
407 {
408 LOG.warn(e);
409 }
410 }
411 }
412
413 synchronized (this)
414 {
415 switch(_state)
416 {
417 case __ASYNCSTARTED:
418 case __ASYNCWAIT:
419 dispatch();
420 }
421 }
422
423 scheduleDispatch();
424 }
425
426
427
428
429
430 public void complete()
431 {
432
433 boolean dispatch=false;
434 synchronized (this)
435 {
436 switch(_state)
437 {
438 case __DISPATCHED:
439 case __REDISPATCHED:
440 throw new IllegalStateException(this.getStatusString());
441
442 case __ASYNCSTARTED:
443 _state=__COMPLETING;
444 return;
445
446 case __ASYNCWAIT:
447 _state=__COMPLETING;
448 dispatch=!_expired;
449 break;
450
451 default:
452 throw new IllegalStateException(this.getStatusString());
453 }
454 }
455
456 if (dispatch)
457 {
458 cancelTimeout();
459 scheduleDispatch();
460 }
461 }
462
463
464
465
466
467
468 protected void doComplete()
469 {
470 final List<ContinuationListener> listeners;
471 synchronized (this)
472 {
473 switch(_state)
474 {
475 case __UNCOMPLETED:
476 _state=__COMPLETED;
477 listeners=_continuationListeners;
478 break;
479
480 default:
481 listeners=null;
482 throw new IllegalStateException(this.getStatusString());
483 }
484 }
485
486 if (listeners!=null)
487 {
488 for(int i=0;i<listeners.size();i++)
489 {
490 try
491 {
492 listeners.get(i).onComplete(this);
493 }
494 catch(Exception e)
495 {
496 LOG.warn(e);
497 }
498 }
499 }
500 }
501
502
503 protected void recycle()
504 {
505 synchronized (this)
506 {
507
508 switch(_state)
509 {
510 case __DISPATCHED:
511 case __REDISPATCHED:
512 throw new IllegalStateException(getStatusString());
513 default:
514 _state=__IDLE;
515 }
516 _initial = true;
517 _resumed=false;
518 _expired=false;
519 _responseWrapped=false;
520 cancelTimeout();
521 _timeoutMs=DEFAULT_TIMEOUT;
522 _continuationListeners=null;
523 }
524 }
525
526
527 public void cancel()
528 {
529 synchronized (this)
530 {
531 cancelTimeout();
532 _continuationListeners=null;
533 }
534 }
535
536
537 protected void scheduleDispatch()
538 {
539 EndPoint endp=_connection.getEndPoint();
540 if (!endp.isBlocking())
541 {
542 ((AsyncEndPoint)endp).dispatch();
543 }
544 }
545
546
547 protected void scheduleTimeout()
548 {
549 EndPoint endp=_connection.getEndPoint();
550 if (_timeoutMs>0)
551 {
552 if (endp.isBlocking())
553 {
554 synchronized(this)
555 {
556 _expireAt = System.currentTimeMillis()+_timeoutMs;
557 long wait=_timeoutMs;
558 while (_expireAt>0 && wait>0 && _connection.getServer().isRunning())
559 {
560 try
561 {
562 this.wait(wait);
563 }
564 catch (InterruptedException e)
565 {
566 LOG.ignore(e);
567 }
568 wait=_expireAt-System.currentTimeMillis();
569 }
570
571 if (_expireAt>0 && wait<=0 && _connection.getServer().isRunning())
572 {
573 expired();
574 }
575 }
576 }
577 else
578 {
579 _connection.scheduleTimeout(_event,_timeoutMs);
580 }
581 }
582 }
583
584
585 protected void cancelTimeout()
586 {
587 EndPoint endp=_connection.getEndPoint();
588 if (endp.isBlocking())
589 {
590 synchronized(this)
591 {
592 _expireAt=0;
593 this.notifyAll();
594 }
595 }
596 else
597 {
598 final AsyncEventState event=_event;
599 if (event!=null)
600 _connection.cancelTimeout(event);
601 }
602 }
603
604
605 public boolean isCompleting()
606 {
607 synchronized (this)
608 {
609 return _state==__COMPLETING;
610 }
611 }
612
613
614 boolean isUncompleted()
615 {
616 synchronized (this)
617 {
618 return _state==__UNCOMPLETED;
619 }
620 }
621
622
623 public boolean isComplete()
624 {
625 synchronized (this)
626 {
627 return _state==__COMPLETED;
628 }
629 }
630
631
632
633 public boolean isAsyncStarted()
634 {
635 synchronized (this)
636 {
637 switch(_state)
638 {
639 case __ASYNCSTARTED:
640 case __REDISPATCHING:
641 case __REDISPATCH:
642 case __ASYNCWAIT:
643 return true;
644
645 default:
646 return false;
647 }
648 }
649 }
650
651
652
653 public boolean isAsync()
654 {
655 synchronized (this)
656 {
657 switch(_state)
658 {
659 case __IDLE:
660 case __DISPATCHED:
661 case __UNCOMPLETED:
662 case __COMPLETED:
663 return false;
664
665 default:
666 return true;
667 }
668 }
669 }
670
671
672 public void dispatch(ServletContext context, String path)
673 {
674 _event._dispatchContext=context;
675 _event._path=path;
676 dispatch();
677 }
678
679
680 public void dispatch(String path)
681 {
682 _event._path=path;
683 dispatch();
684 }
685
686
687 public Request getBaseRequest()
688 {
689 return _connection.getRequest();
690 }
691
692
693 public ServletRequest getRequest()
694 {
695 if (_event!=null)
696 return _event.getRequest();
697 return _connection.getRequest();
698 }
699
700
701 public ServletResponse getResponse()
702 {
703 if (_event!=null)
704 return _event.getResponse();
705 return _connection.getResponse();
706 }
707
708
709 public void start(final Runnable run)
710 {
711 final AsyncEventState event=_event;
712 if (event!=null)
713 {
714 _connection.getServer().getThreadPool().dispatch(new Runnable()
715 {
716 public void run()
717 {
718 ((Context)event.getServletContext()).getContextHandler().handle(run);
719 }
720 });
721 }
722 }
723
724
725 public boolean hasOriginalRequestAndResponse()
726 {
727 synchronized (this)
728 {
729 return (_event!=null && _event.getRequest()==_connection._request && _event.getResponse()==_connection._response);
730 }
731 }
732
733
734 public ContextHandler getContextHandler()
735 {
736 final AsyncEventState event=_event;
737 if (event!=null)
738 return ((Context)event.getServletContext()).getContextHandler();
739 return null;
740 }
741
742
743
744
745
746
747 public boolean isResumed()
748 {
749 synchronized (this)
750 {
751 return _resumed;
752 }
753 }
754
755
756
757
758 public boolean isExpired()
759 {
760 synchronized (this)
761 {
762 return _expired;
763 }
764 }
765
766
767
768
769
770 public void resume()
771 {
772 dispatch();
773 }
774
775
776
777
778
779 public void suspend(ServletResponse response)
780 {
781 _responseWrapped=!(response instanceof Response);
782 AsyncContinuation.this.suspend(_connection.getRequest().getServletContext(),_connection.getRequest(),response);
783 }
784
785
786
787
788
789 public void suspend()
790 {
791 _responseWrapped=false;
792 AsyncContinuation.this.suspend(_connection.getRequest().getServletContext(),_connection.getRequest(),_connection.getResponse());
793 }
794
795
796
797
798
799 public ServletResponse getServletResponse()
800 {
801 if (_responseWrapped && _event!=null && _event.getResponse()!=null)
802 return _event.getResponse();
803 return _connection.getResponse();
804 }
805
806
807
808
809
810 public Object getAttribute(String name)
811 {
812 return _connection.getRequest().getAttribute(name);
813 }
814
815
816
817
818
819 public void removeAttribute(String name)
820 {
821 _connection.getRequest().removeAttribute(name);
822 }
823
824
825
826
827
828 public void setAttribute(String name, Object attribute)
829 {
830 _connection.getRequest().setAttribute(name,attribute);
831 }
832
833
834
835
836
837 public void undispatch()
838 {
839 if (isSuspended())
840 {
841 if (LOG.isDebugEnabled())
842 throw new ContinuationThrowable();
843 else
844 throw __exception;
845 }
846 throw new IllegalStateException("!suspended");
847 }
848
849
850
851 public class AsyncEventState extends Timeout.Task implements Runnable
852 {
853 private final ServletContext _suspendedContext;
854 private final ServletRequest _request;
855 private final ServletResponse _response;
856 private ServletContext _dispatchContext;
857 private String _path;
858
859 public AsyncEventState(ServletContext context, ServletRequest request, ServletResponse response)
860 {
861 _suspendedContext=context;
862 _request=request;
863 _response=response;
864 }
865
866 public ServletContext getSuspendedContext()
867 {
868 return _suspendedContext;
869 }
870
871 public ServletContext getDispatchContext()
872 {
873 return _dispatchContext;
874 }
875
876 public ServletContext getServletContext()
877 {
878 return _dispatchContext==null?_suspendedContext:_dispatchContext;
879 }
880
881 public ServletRequest getRequest()
882 {
883 return _request;
884 }
885
886 public ServletResponse getResponse()
887 {
888 return _response;
889 }
890
891 public String getPath()
892 {
893 return _path;
894 }
895
896 @Override
897 public void expired()
898 {
899 AsyncContinuation.this.expired();
900 }
901
902 public void run()
903 {
904 AsyncContinuation.this.expired();
905 }
906 }
907 }