1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.TimeoutException;
25
26 import javax.servlet.AsyncListener;
27 import javax.servlet.RequestDispatcher;
28 import javax.servlet.ServletContext;
29 import javax.servlet.ServletResponse;
30
31 import org.eclipse.jetty.server.handler.ContextHandler;
32 import org.eclipse.jetty.server.handler.ContextHandler.Context;
33 import org.eclipse.jetty.util.log.Log;
34 import org.eclipse.jetty.util.log.Logger;
35 import org.eclipse.jetty.util.thread.Locker;
36 import org.eclipse.jetty.util.thread.Scheduler;
37
38
39
40
41 public class HttpChannelState
42 {
43 private static final Logger LOG = Log.getLogger(HttpChannelState.class);
44
45 private final static long DEFAULT_TIMEOUT=Long.getLong("org.eclipse.jetty.server.HttpChannelState.DEFAULT_TIMEOUT",30000L);
46
47
48
49
50 public enum State
51 {
52 IDLE,
53 DISPATCHED,
54 ASYNC_WAIT,
55 ASYNC_WOKEN,
56 ASYNC_IO,
57 COMPLETING,
58 COMPLETED,
59 UPGRADED
60 }
61
62
63
64
65 public enum Action
66 {
67 DISPATCH,
68 ASYNC_DISPATCH,
69 ERROR_DISPATCH,
70 ASYNC_ERROR,
71 WRITE_CALLBACK,
72 READ_CALLBACK,
73 COMPLETE,
74 TERMINATED,
75 WAIT,
76 }
77
78
79
80
81
82
83 public enum Async
84 {
85 STARTED,
86 DISPATCH,
87 COMPLETE,
88 EXPIRING,
89 EXPIRED,
90 ERRORING,
91 ERRORED
92 }
93
94 private final boolean DEBUG=LOG.isDebugEnabled();
95 private final Locker _locker=new Locker();
96 private final HttpChannel _channel;
97
98 private List<AsyncListener> _asyncListeners;
99 private State _state;
100 private Async _async;
101 private boolean _initial;
102 private boolean _asyncReadPossible;
103 private boolean _asyncReadUnready;
104 private boolean _asyncWrite;
105 private long _timeoutMs=DEFAULT_TIMEOUT;
106 private AsyncContextEvent _event;
107
108 protected HttpChannelState(HttpChannel channel)
109 {
110 _channel=channel;
111 _state=State.IDLE;
112 _async=null;
113 _initial=true;
114 }
115
116 public State getState()
117 {
118 try(Locker.Lock lock= _locker.lock())
119 {
120 return _state;
121 }
122 }
123
124 public void addListener(AsyncListener listener)
125 {
126 try(Locker.Lock lock= _locker.lock())
127 {
128 if (_asyncListeners==null)
129 _asyncListeners=new ArrayList<>();
130 _asyncListeners.add(listener);
131 }
132 }
133
134 public void setTimeout(long ms)
135 {
136 try(Locker.Lock lock= _locker.lock())
137 {
138 _timeoutMs=ms;
139 }
140 }
141
142 public long getTimeout()
143 {
144 try(Locker.Lock lock= _locker.lock())
145 {
146 return _timeoutMs;
147 }
148 }
149
150 public AsyncContextEvent getAsyncContextEvent()
151 {
152 try(Locker.Lock lock= _locker.lock())
153 {
154 return _event;
155 }
156 }
157
158 @Override
159 public String toString()
160 {
161 try(Locker.Lock lock= _locker.lock())
162 {
163 return String.format("%s@%x{s=%s a=%s i=%b r=%s w=%b}",getClass().getSimpleName(),hashCode(),_state,_async,_initial,
164 _asyncReadPossible?(_asyncReadUnready?"PU":"P!U"):(_asyncReadUnready?"!PU":"!P!U"),
165 _asyncWrite);
166 }
167 }
168
169 private String getStatusStringLocked()
170 {
171 return String.format("s=%s i=%b a=%s",_state,_initial,_async);
172 }
173
174 public String getStatusString()
175 {
176 try(Locker.Lock lock= _locker.lock())
177 {
178 return getStatusStringLocked();
179 }
180 }
181
182
183
184
185 protected Action handling()
186 {
187 if(DEBUG)
188 LOG.debug("{} handling {}",this,_state);
189 try(Locker.Lock lock= _locker.lock())
190 {
191 switch(_state)
192 {
193 case IDLE:
194 _initial=true;
195 _state=State.DISPATCHED;
196 return Action.DISPATCH;
197
198 case COMPLETING:
199 return Action.COMPLETE;
200
201 case COMPLETED:
202 return Action.TERMINATED;
203
204 case ASYNC_WOKEN:
205 if (_asyncReadPossible)
206 {
207 _state=State.ASYNC_IO;
208 _asyncReadUnready=false;
209 return Action.READ_CALLBACK;
210 }
211
212 if (_asyncWrite)
213 {
214 _state=State.ASYNC_IO;
215 _asyncWrite=false;
216 return Action.WRITE_CALLBACK;
217 }
218
219 if (_async!=null)
220 {
221 Async async=_async;
222 switch(async)
223 {
224 case COMPLETE:
225 _state=State.COMPLETING;
226 return Action.COMPLETE;
227 case DISPATCH:
228 _state=State.DISPATCHED;
229 _async=null;
230 return Action.ASYNC_DISPATCH;
231 case EXPIRING:
232 break;
233 case EXPIRED:
234 _state=State.DISPATCHED;
235 _async=null;
236 return Action.ERROR_DISPATCH;
237 case STARTED:
238 return Action.WAIT;
239 case ERRORING:
240 _state=State.DISPATCHED;
241 return Action.ASYNC_ERROR;
242
243 default:
244 throw new IllegalStateException(getStatusStringLocked());
245 }
246 }
247
248 return Action.WAIT;
249
250 case ASYNC_IO:
251 case ASYNC_WAIT:
252 case DISPATCHED:
253 case UPGRADED:
254 default:
255 throw new IllegalStateException(getStatusStringLocked());
256
257 }
258 }
259 }
260
261 public void startAsync(AsyncContextEvent event)
262 {
263 final List<AsyncListener> lastAsyncListeners;
264
265 try(Locker.Lock lock= _locker.lock())
266 {
267 if (_state!=State.DISPATCHED || _async!=null)
268 throw new IllegalStateException(this.getStatusStringLocked());
269
270 _async=Async.STARTED;
271 _event=event;
272 lastAsyncListeners=_asyncListeners;
273 _asyncListeners=null;
274 }
275
276 if (lastAsyncListeners!=null)
277 {
278 Runnable callback=new Runnable()
279 {
280 @Override
281 public void run()
282 {
283 for (AsyncListener listener : lastAsyncListeners)
284 {
285 try
286 {
287 listener.onStartAsync(event);
288 }
289 catch(Exception e)
290 {
291
292 LOG.warn(e);
293 }
294 }
295 }
296 @Override
297 public String toString()
298 {
299 return "startAsync";
300 }
301 };
302
303 runInContext(event,callback);
304 }
305 }
306
307 protected void error(Throwable th)
308 {
309 try(Locker.Lock lock= _locker.lock())
310 {
311 if (_event!=null)
312 _event.addThrowable(th);
313 _async=Async.ERRORING;
314 }
315 }
316
317
318
319
320
321
322
323
324 protected Action unhandle()
325 {
326 Action action;
327 AsyncContextEvent schedule_event=null;
328 boolean read_interested=false;
329
330 if(DEBUG)
331 LOG.debug("{} unhandle {}",this,_state);
332
333 try(Locker.Lock lock= _locker.lock())
334 {
335 switch(_state)
336 {
337 case COMPLETING:
338 case COMPLETED:
339 return Action.TERMINATED;
340
341 case DISPATCHED:
342 case ASYNC_IO:
343 break;
344
345 default:
346 throw new IllegalStateException(this.getStatusStringLocked());
347 }
348
349 if (_async!=null)
350 {
351 _initial=false;
352 switch(_async)
353 {
354 case COMPLETE:
355 _state=State.COMPLETING;
356 _async=null;
357 action=Action.COMPLETE;
358 break;
359
360 case DISPATCH:
361 _state=State.DISPATCHED;
362 _async=null;
363 action=Action.ASYNC_DISPATCH;
364 break;
365
366 case EXPIRED:
367 _state=State.DISPATCHED;
368 _async=null;
369 action = Action.ERROR_DISPATCH;
370 break;
371
372 case STARTED:
373 if (_asyncReadUnready && _asyncReadPossible)
374 {
375 _state=State.ASYNC_IO;
376 _asyncReadUnready=false;
377 action = Action.READ_CALLBACK;
378 }
379 else if (_asyncWrite)
380 {
381 _asyncWrite=false;
382 _state=State.ASYNC_IO;
383 action=Action.WRITE_CALLBACK;
384 }
385 else
386 {
387 schedule_event=_event;
388 read_interested=_asyncReadUnready;
389 _state=State.ASYNC_WAIT;
390 action=Action.WAIT;
391 }
392 break;
393
394 case EXPIRING:
395 schedule_event=_event;
396 _state=State.ASYNC_WAIT;
397 action=Action.WAIT;
398 break;
399
400 case ERRORING:
401 _state=State.DISPATCHED;
402 action=Action.ASYNC_ERROR;
403 break;
404
405 case ERRORED:
406 _state=State.DISPATCHED;
407 action=Action.ERROR_DISPATCH;
408 _async=null;
409 break;
410
411 default:
412 _state=State.COMPLETING;
413 action=Action.COMPLETE;
414 break;
415 }
416 }
417 else
418 {
419 _state=State.COMPLETING;
420 action=Action.COMPLETE;
421 }
422 }
423
424 if (schedule_event!=null)
425 scheduleTimeout(schedule_event);
426 if (read_interested)
427 _channel.asyncReadFillInterested();
428 return action;
429 }
430
431 public void dispatch(ServletContext context, String path)
432 {
433 boolean dispatch=false;
434 AsyncContextEvent event=null;
435 try(Locker.Lock lock= _locker.lock())
436 {
437 boolean started=false;
438 event=_event;
439 switch(_async)
440 {
441 case STARTED:
442 started=true;
443 break;
444 case EXPIRING:
445 case ERRORED:
446 break;
447 default:
448 throw new IllegalStateException(this.getStatusStringLocked());
449 }
450 _async=Async.DISPATCH;
451
452 if (context!=null)
453 _event.setDispatchContext(context);
454 if (path!=null)
455 _event.setDispatchPath(path);
456
457 if (started)
458 {
459 switch(_state)
460 {
461 case DISPATCHED:
462 case ASYNC_IO:
463 case ASYNC_WOKEN:
464 break;
465 case ASYNC_WAIT:
466 _state=State.ASYNC_WOKEN;
467 dispatch=true;
468 break;
469 default:
470 LOG.warn("async dispatched when complete {}",this);
471 break;
472 }
473 }
474 }
475
476 cancelTimeout(event);
477 if (dispatch)
478 scheduleDispatch();
479 }
480
481 protected void onTimeout()
482 {
483 final List<AsyncListener> listeners;
484 AsyncContextEvent event;
485 try(Locker.Lock lock= _locker.lock())
486 {
487 if (_async!=Async.STARTED)
488 return;
489 _async=Async.EXPIRING;
490 event=_event;
491 listeners=_asyncListeners;
492
493 }
494
495 if (LOG.isDebugEnabled())
496 LOG.debug("Async timeout {}",this);
497
498 if (listeners!=null)
499 {
500 Runnable callback=new Runnable()
501 {
502 @Override
503 public void run()
504 {
505 for (AsyncListener listener : listeners)
506 {
507 try
508 {
509 listener.onTimeout(event);
510 }
511 catch(Exception e)
512 {
513 LOG.debug(e);
514 event.addThrowable(e);
515 _channel.getRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION,event.getThrowable());
516 break;
517 }
518 }
519 }
520 @Override
521 public String toString()
522 {
523 return "onTimeout";
524 }
525 };
526
527 runInContext(event,callback);
528 }
529
530 boolean dispatch=false;
531 try(Locker.Lock lock= _locker.lock())
532 {
533 switch(_async)
534 {
535 case EXPIRING:
536 if (event.getThrowable()==null)
537 {
538 _async=Async.EXPIRED;
539 _event.addThrowable(new TimeoutException("Async API violation"));
540 }
541 else
542 {
543 _async=Async.ERRORING;
544 }
545 break;
546
547 case COMPLETE:
548 case DISPATCH:
549 break;
550
551 default:
552 throw new IllegalStateException();
553 }
554
555 if (_state==State.ASYNC_WAIT)
556 {
557 _state=State.ASYNC_WOKEN;
558 dispatch=true;
559 }
560 }
561
562 if (dispatch)
563 {
564 if (LOG.isDebugEnabled())
565 LOG.debug("Dispatch after async timeout {}",this);
566 scheduleDispatch();
567 }
568 }
569
570 public void complete()
571 {
572
573 boolean handle=false;
574 AsyncContextEvent event=null;
575 try(Locker.Lock lock= _locker.lock())
576 {
577 boolean started=false;
578 event=_event;
579
580 switch(_async)
581 {
582 case STARTED:
583 started=true;
584 break;
585 case EXPIRING:
586 case ERRORED:
587 break;
588 default:
589 throw new IllegalStateException(this.getStatusStringLocked());
590 }
591 _async=Async.COMPLETE;
592
593 if (started && _state==State.ASYNC_WAIT)
594 {
595 handle=true;
596 _state=State.ASYNC_WOKEN;
597 }
598 }
599
600 cancelTimeout(event);
601 if (handle)
602 runInContext(event,_channel);
603 }
604
605 public void errorComplete()
606 {
607 try(Locker.Lock lock= _locker.lock())
608 {
609 _async=Async.COMPLETE;
610 _event.setDispatchContext(null);
611 _event.setDispatchPath(null);
612 }
613
614 cancelTimeout();
615 }
616
617 protected void onError()
618 {
619 final List<AsyncListener> aListeners;
620 final AsyncContextEvent event;
621
622 try(Locker.Lock lock= _locker.lock())
623 {
624 if (_state!=State.DISPATCHED
625 throw new IllegalStateException(this.getStatusStringLocked());
626
627 aListeners=_asyncListeners;
628 event=_event;
629 _async=Async.ERRORED;
630 }
631
632 if (event!=null && aListeners!=null)
633 {
634 event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION,event.getThrowable());
635 event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_MESSAGE,event.getThrowable().getMessage());
636 for (AsyncListener listener : aListeners)
637 {
638 try
639 {
640 listener.onError(event);
641 }
642 catch(Exception x)
643 {
644 LOG.info("Exception while invoking listener " + listener, x);
645 }
646 }
647 }
648 }
649
650
651 protected void onComplete()
652 {
653 final List<AsyncListener> aListeners;
654 final AsyncContextEvent event;
655
656 try(Locker.Lock lock= _locker.lock())
657 {
658 switch(_state)
659 {
660 case COMPLETING:
661 aListeners=_asyncListeners;
662 event=_event;
663 _state=State.COMPLETED;
664 _async=null;
665 break;
666
667 default:
668 throw new IllegalStateException(this.getStatusStringLocked());
669 }
670 }
671
672 if (event!=null)
673 {
674 if (aListeners!=null)
675 {
676 Runnable callback = new Runnable()
677 {
678 @Override
679 public void run()
680 {
681 for (AsyncListener listener : aListeners)
682 {
683 try
684 {
685 listener.onComplete(event);
686 }
687 catch(Exception e)
688 {
689 LOG.warn(e);
690 }
691 }
692 }
693 @Override
694 public String toString()
695 {
696 return "onComplete";
697 }
698 };
699
700 runInContext(event,callback);
701 }
702 event.completed();
703 }
704 }
705
706 protected void recycle()
707 {
708 cancelTimeout();
709 try(Locker.Lock lock= _locker.lock())
710 {
711 switch(_state)
712 {
713 case DISPATCHED:
714 case ASYNC_IO:
715 throw new IllegalStateException(getStatusStringLocked());
716 case UPGRADED:
717 return;
718 default:
719 break;
720 }
721 _asyncListeners=null;
722 _state=State.IDLE;
723 _async=null;
724 _initial=true;
725 _asyncReadPossible=_asyncReadUnready=false;
726 _asyncWrite=false;
727 _timeoutMs=DEFAULT_TIMEOUT;
728 _event=null;
729 }
730 }
731
732 public void upgrade()
733 {
734 cancelTimeout();
735 try(Locker.Lock lock= _locker.lock())
736 {
737 switch(_state)
738 {
739 case IDLE:
740 case COMPLETED:
741 break;
742 default:
743 throw new IllegalStateException(getStatusStringLocked());
744 }
745 _asyncListeners=null;
746 _state=State.UPGRADED;
747 _async=null;
748 _initial=true;
749 _asyncReadPossible=_asyncReadUnready=false;
750 _asyncWrite=false;
751 _timeoutMs=DEFAULT_TIMEOUT;
752 _event=null;
753 }
754 }
755
756 protected void scheduleDispatch()
757 {
758 _channel.execute(_channel);
759 }
760
761 protected void scheduleTimeout(AsyncContextEvent event)
762 {
763 Scheduler scheduler = _channel.getScheduler();
764 if (scheduler!=null && _timeoutMs>0)
765 event.setTimeoutTask(scheduler.schedule(event,_timeoutMs,TimeUnit.MILLISECONDS));
766 }
767
768 protected void cancelTimeout()
769 {
770 final AsyncContextEvent event;
771 try(Locker.Lock lock= _locker.lock())
772 {
773 event=_event;
774 }
775 cancelTimeout(event);
776 }
777
778 protected void cancelTimeout(AsyncContextEvent event)
779 {
780 if (event!=null)
781 event.cancelTimeoutTask();
782 }
783
784 public boolean isIdle()
785 {
786 try(Locker.Lock lock= _locker.lock())
787 {
788 return _state==State.IDLE;
789 }
790 }
791
792 public boolean isExpired()
793 {
794 try(Locker.Lock lock= _locker.lock())
795 {
796 return _async==Async.EXPIRED;
797 }
798 }
799
800 public boolean isInitial()
801 {
802 try(Locker.Lock lock= _locker.lock())
803 {
804 return _initial;
805 }
806 }
807
808 public boolean isSuspended()
809 {
810 try(Locker.Lock lock= _locker.lock())
811 {
812 return _state==State.ASYNC_WAIT || _state==State.DISPATCHED && _async==Async.STARTED;
813 }
814 }
815
816 boolean isCompleting()
817 {
818 try(Locker.Lock lock= _locker.lock())
819 {
820 return _state==State.COMPLETING;
821 }
822 }
823
824 boolean isCompleted()
825 {
826 try(Locker.Lock lock= _locker.lock())
827 {
828 return _state == State.COMPLETED;
829 }
830 }
831
832 public boolean isAsyncStarted()
833 {
834 try(Locker.Lock lock= _locker.lock())
835 {
836 if (_state==State.DISPATCHED)
837 return _async!=null;
838 return _async==Async.STARTED || _async==Async.EXPIRING;
839 }
840 }
841
842 public boolean isAsync()
843 {
844 try(Locker.Lock lock= _locker.lock())
845 {
846 return !_initial || _async!=null;
847 }
848 }
849
850 public Request getBaseRequest()
851 {
852 return _channel.getRequest();
853 }
854
855 public HttpChannel getHttpChannel()
856 {
857 return _channel;
858 }
859
860 public ContextHandler getContextHandler()
861 {
862 final AsyncContextEvent event;
863 try(Locker.Lock lock= _locker.lock())
864 {
865 event=_event;
866 }
867 return getContextHandler(event);
868 }
869
870 ContextHandler getContextHandler(AsyncContextEvent event)
871 {
872 if (event!=null)
873 {
874 Context context=((Context)event.getServletContext());
875 if (context!=null)
876 return context.getContextHandler();
877 }
878 return null;
879 }
880
881 public ServletResponse getServletResponse()
882 {
883 final AsyncContextEvent event;
884 try(Locker.Lock lock= _locker.lock())
885 {
886 event=_event;
887 }
888 return getServletResponse(event);
889 }
890
891 public ServletResponse getServletResponse(AsyncContextEvent event)
892 {
893 if (event!=null && event.getSuppliedResponse()!=null)
894 return event.getSuppliedResponse();
895 return _channel.getResponse();
896 }
897
898 void runInContext(AsyncContextEvent event,Runnable runnable)
899 {
900 ContextHandler contextHandler = getContextHandler(event);
901 if (contextHandler==null)
902 runnable.run();
903 else
904 contextHandler.handle(_channel.getRequest(),runnable);
905 }
906
907 public Object getAttribute(String name)
908 {
909 return _channel.getRequest().getAttribute(name);
910 }
911
912 public void removeAttribute(String name)
913 {
914 _channel.getRequest().removeAttribute(name);
915 }
916
917 public void setAttribute(String name, Object attribute)
918 {
919 _channel.getRequest().setAttribute(name,attribute);
920 }
921
922
923
924
925
926
927
928
929
930 public void onReadUnready()
931 {
932 boolean interested=false;
933 try(Locker.Lock lock= _locker.lock())
934 {
935
936 if (!_asyncReadUnready)
937 {
938 _asyncReadUnready=true;
939 _asyncReadPossible=false;
940 if (_state==State.ASYNC_WAIT)
941 interested=true;
942 }
943 }
944
945 if (interested)
946 _channel.asyncReadFillInterested();
947 }
948
949
950
951
952
953
954
955
956 public boolean onReadPossible()
957 {
958 boolean woken=false;
959 try(Locker.Lock lock= _locker.lock())
960 {
961 _asyncReadPossible=true;
962 if (_state==State.ASYNC_WAIT && _asyncReadUnready)
963 {
964 woken=true;
965 _state=State.ASYNC_WOKEN;
966 }
967 }
968 return woken;
969 }
970
971
972
973
974
975
976
977
978 public boolean onReadReady()
979 {
980 boolean woken=false;
981 try(Locker.Lock lock= _locker.lock())
982 {
983 _asyncReadUnready=true;
984 _asyncReadPossible=true;
985 if (_state==State.ASYNC_WAIT)
986 {
987 woken=true;
988 _state=State.ASYNC_WOKEN;
989 }
990 }
991 return woken;
992 }
993
994 public boolean isReadPossible()
995 {
996 try(Locker.Lock lock= _locker.lock())
997 {
998 return _asyncReadPossible;
999 }
1000 }
1001
1002 public boolean onWritePossible()
1003 {
1004 boolean handle=false;
1005
1006 try(Locker.Lock lock= _locker.lock())
1007 {
1008 _asyncWrite=true;
1009 if (_state==State.ASYNC_WAIT)
1010 {
1011 _state=State.ASYNC_WOKEN;
1012 handle=true;
1013 }
1014 }
1015
1016 return handle;
1017 }
1018
1019 }