1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.server;
20
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.TimeoutException;
25
26 import javax.servlet.AsyncListener;
27 import javax.servlet.RequestDispatcher;
28 import javax.servlet.ServletContext;
29 import javax.servlet.ServletResponse;
30
31 import org.eclipse.jetty.server.handler.ContextHandler;
32 import org.eclipse.jetty.server.handler.ContextHandler.Context;
33 import org.eclipse.jetty.util.log.Log;
34 import org.eclipse.jetty.util.log.Logger;
35 import org.eclipse.jetty.util.thread.Locker;
36 import org.eclipse.jetty.util.thread.Scheduler;
37
38
39
40
41 public class HttpChannelState
42 {
43 private static final Logger LOG = Log.getLogger(HttpChannelState.class);
44
45 private final static long DEFAULT_TIMEOUT=Long.getLong("org.eclipse.jetty.server.HttpChannelState.DEFAULT_TIMEOUT",30000L);
46
47
48
49
50 public enum State
51 {
52 IDLE,
53 DISPATCHED,
54 ASYNC_WAIT,
55 ASYNC_WOKEN,
56 ASYNC_IO,
57 COMPLETING,
58 COMPLETED,
59 UPGRADED
60 }
61
62
63
64
65 public enum Action
66 {
67 DISPATCH,
68 ASYNC_DISPATCH,
69 ERROR_DISPATCH,
70 ASYNC_ERROR,
71 WRITE_CALLBACK,
72 READ_CALLBACK,
73 COMPLETE,
74 TERMINATED,
75 WAIT,
76 }
77
78
79
80
81
82
83 public enum Async
84 {
85 NOT_ASYNC,
86 STARTED,
87 DISPATCH,
88 COMPLETE,
89 EXPIRING,
90 EXPIRED,
91 ERRORING,
92 ERRORED
93 }
94
95 private final boolean DEBUG=LOG.isDebugEnabled();
96 private final Locker _locker=new Locker();
97 private final HttpChannel _channel;
98
99 private List<AsyncListener> _asyncListeners;
100 private State _state;
101 private Async _async;
102 private boolean _initial;
103 private boolean _asyncReadPossible;
104 private boolean _asyncReadUnready;
105 private boolean _asyncWrite;
106 private long _timeoutMs=DEFAULT_TIMEOUT;
107 private AsyncContextEvent _event;
108
109 protected HttpChannelState(HttpChannel channel)
110 {
111 _channel=channel;
112 _state=State.IDLE;
113 _async=Async.NOT_ASYNC;
114 _initial=true;
115 }
116
117 public State getState()
118 {
119 try(Locker.Lock lock= _locker.lock())
120 {
121 return _state;
122 }
123 }
124
125 public void addListener(AsyncListener listener)
126 {
127 try(Locker.Lock lock= _locker.lock())
128 {
129 if (_asyncListeners==null)
130 _asyncListeners=new ArrayList<>();
131 _asyncListeners.add(listener);
132 }
133 }
134
135 public void setTimeout(long ms)
136 {
137 try(Locker.Lock lock= _locker.lock())
138 {
139 _timeoutMs=ms;
140 }
141 }
142
143 public long getTimeout()
144 {
145 try(Locker.Lock lock= _locker.lock())
146 {
147 return _timeoutMs;
148 }
149 }
150
151 public AsyncContextEvent getAsyncContextEvent()
152 {
153 try(Locker.Lock lock= _locker.lock())
154 {
155 return _event;
156 }
157 }
158
159 @Override
160 public String toString()
161 {
162 try(Locker.Lock lock= _locker.lock())
163 {
164 return String.format("%s@%x{s=%s a=%s i=%b r=%s w=%b}",getClass().getSimpleName(),hashCode(),_state,_async,_initial,
165 _asyncReadPossible?(_asyncReadUnready?"PU":"P!U"):(_asyncReadUnready?"!PU":"!P!U"),
166 _asyncWrite);
167 }
168 }
169
170 private String getStatusStringLocked()
171 {
172 return String.format("s=%s i=%b a=%s",_state,_initial,_async);
173 }
174
175 public String getStatusString()
176 {
177 try(Locker.Lock lock= _locker.lock())
178 {
179 return getStatusStringLocked();
180 }
181 }
182
183
184
185
186 protected Action handling()
187 {
188 if(DEBUG)
189 LOG.debug("{} handling {}",this,_state);
190 try(Locker.Lock lock= _locker.lock())
191 {
192 switch(_state)
193 {
194 case IDLE:
195 _initial=true;
196 _state=State.DISPATCHED;
197 return Action.DISPATCH;
198
199 case COMPLETING:
200 case COMPLETED:
201 return Action.TERMINATED;
202
203 case ASYNC_WOKEN:
204 if (_asyncReadPossible)
205 {
206 _state=State.ASYNC_IO;
207 _asyncReadUnready=false;
208 return Action.READ_CALLBACK;
209 }
210
211 if (_asyncWrite)
212 {
213 _state=State.ASYNC_IO;
214 _asyncWrite=false;
215 return Action.WRITE_CALLBACK;
216 }
217
218 switch(_async)
219 {
220 case COMPLETE:
221 _state=State.COMPLETING;
222 return Action.COMPLETE;
223 case DISPATCH:
224 _state=State.DISPATCHED;
225 _async=Async.NOT_ASYNC;
226 return Action.ASYNC_DISPATCH;
227 case EXPIRING:
228 break;
229 case EXPIRED:
230 _state=State.DISPATCHED;
231 _async=Async.NOT_ASYNC;
232 return Action.ERROR_DISPATCH;
233 case STARTED:
234 return Action.WAIT;
235 case ERRORING:
236 _state=State.DISPATCHED;
237 return Action.ASYNC_ERROR;
238 case NOT_ASYNC:
239 break;
240 default:
241 throw new IllegalStateException(getStatusStringLocked());
242 }
243
244 return Action.WAIT;
245
246 case ASYNC_IO:
247 case ASYNC_WAIT:
248 case DISPATCHED:
249 case UPGRADED:
250 default:
251 throw new IllegalStateException(getStatusStringLocked());
252
253 }
254 }
255 }
256
257 public void startAsync(AsyncContextEvent event)
258 {
259 final List<AsyncListener> lastAsyncListeners;
260
261 try(Locker.Lock lock= _locker.lock())
262 {
263 if (_state!=State.DISPATCHED || _async!=Async.NOT_ASYNC)
264 throw new IllegalStateException(this.getStatusStringLocked());
265
266 _async=Async.STARTED;
267 _event=event;
268 lastAsyncListeners=_asyncListeners;
269 _asyncListeners=null;
270 }
271
272 if (lastAsyncListeners!=null)
273 {
274 Runnable callback=new Runnable()
275 {
276 @Override
277 public void run()
278 {
279 for (AsyncListener listener : lastAsyncListeners)
280 {
281 try
282 {
283 listener.onStartAsync(event);
284 }
285 catch(Throwable e)
286 {
287
288 LOG.warn(e);
289 }
290 }
291 }
292 @Override
293 public String toString()
294 {
295 return "startAsync";
296 }
297 };
298
299 runInContext(event,callback);
300 }
301 }
302
303 protected void error(Throwable th)
304 {
305 try(Locker.Lock lock= _locker.lock())
306 {
307 if (_event!=null)
308 _event.addThrowable(th);
309 _async=Async.ERRORING;
310 }
311 }
312
313 public void asyncError(Throwable failure)
314 {
315 AsyncContextEvent event = null;
316 try (Locker.Lock lock= _locker.lock())
317 {
318 switch (_state)
319 {
320 case IDLE:
321 case DISPATCHED:
322 case COMPLETING:
323 case COMPLETED:
324 case UPGRADED:
325 case ASYNC_IO:
326 case ASYNC_WOKEN:
327 {
328 break;
329 }
330 case ASYNC_WAIT:
331 {
332 _event.addThrowable(failure);
333 _state=State.ASYNC_WOKEN;
334 _async=Async.ERRORING;
335 event=_event;
336 break;
337 }
338 default:
339 {
340 throw new IllegalStateException(getStatusStringLocked());
341 }
342 }
343 }
344
345 if (event != null)
346 {
347 cancelTimeout(event);
348 runInContext(event, _channel);
349 }
350 }
351
352
353
354
355
356
357
358
359 protected Action unhandle()
360 {
361 Action action;
362 boolean read_interested=false;
363
364 if(DEBUG)
365 LOG.debug("{} unhandle {}",this,_state);
366
367 try(Locker.Lock lock= _locker.lock())
368 {
369 switch(_state)
370 {
371 case COMPLETING:
372 case COMPLETED:
373 return Action.TERMINATED;
374
375 case DISPATCHED:
376 case ASYNC_IO:
377 break;
378
379 default:
380 throw new IllegalStateException(this.getStatusStringLocked());
381 }
382
383 _initial=false;
384 switch(_async)
385 {
386 case COMPLETE:
387 _state=State.COMPLETING;
388 _async=Async.NOT_ASYNC;
389 action=Action.COMPLETE;
390 break;
391
392 case DISPATCH:
393 _state=State.DISPATCHED;
394 _async=Async.NOT_ASYNC;
395 action=Action.ASYNC_DISPATCH;
396 break;
397
398 case EXPIRED:
399 _state=State.DISPATCHED;
400 _async=Async.NOT_ASYNC;
401 action = Action.ERROR_DISPATCH;
402 break;
403
404 case STARTED:
405 if (_asyncReadUnready && _asyncReadPossible)
406 {
407 _state=State.ASYNC_IO;
408 _asyncReadUnready=false;
409 action = Action.READ_CALLBACK;
410 }
411 else if (_asyncWrite)
412 {
413 _asyncWrite=false;
414 _state=State.ASYNC_IO;
415 action=Action.WRITE_CALLBACK;
416 }
417 else
418 {
419 _state=State.ASYNC_WAIT;
420 action=Action.WAIT;
421 if (_asyncReadUnready)
422 read_interested=true;
423 Scheduler scheduler=_channel.getScheduler();
424 if (scheduler!=null && _timeoutMs>0)
425 _event.setTimeoutTask(scheduler.schedule(_event,_timeoutMs,TimeUnit.MILLISECONDS));
426 }
427 break;
428
429 case EXPIRING:
430 _state=State.ASYNC_WAIT;
431 action=Action.WAIT;
432 break;
433
434 case ERRORING:
435 _state=State.DISPATCHED;
436 action=Action.ASYNC_ERROR;
437 break;
438
439 case ERRORED:
440 _state=State.DISPATCHED;
441 action=Action.ERROR_DISPATCH;
442 _async=Async.NOT_ASYNC;
443 break;
444
445 case NOT_ASYNC:
446 _state=State.COMPLETING;
447 action=Action.COMPLETE;
448 break;
449
450 default:
451 _state=State.COMPLETING;
452 action=Action.COMPLETE;
453 break;
454 }
455 }
456
457 if (read_interested)
458 _channel.asyncReadFillInterested();
459
460 return action;
461 }
462
463 public void dispatch(ServletContext context, String path)
464 {
465 boolean dispatch=false;
466 AsyncContextEvent event=null;
467 try(Locker.Lock lock= _locker.lock())
468 {
469 boolean started=false;
470 event=_event;
471 switch(_async)
472 {
473 case STARTED:
474 started=true;
475 break;
476 case EXPIRING:
477 case ERRORED:
478 break;
479 default:
480 throw new IllegalStateException(this.getStatusStringLocked());
481 }
482 _async=Async.DISPATCH;
483
484 if (context!=null)
485 _event.setDispatchContext(context);
486 if (path!=null)
487 _event.setDispatchPath(path);
488
489 if (started)
490 {
491 switch(_state)
492 {
493 case DISPATCHED:
494 case ASYNC_IO:
495 case ASYNC_WOKEN:
496 break;
497 case ASYNC_WAIT:
498 _state=State.ASYNC_WOKEN;
499 dispatch=true;
500 break;
501 default:
502 LOG.warn("async dispatched when complete {}",this);
503 break;
504 }
505 }
506 }
507
508 cancelTimeout(event);
509 if (dispatch)
510 scheduleDispatch();
511 }
512
513 protected void onTimeout()
514 {
515 final List<AsyncListener> listeners;
516 AsyncContextEvent event;
517 try(Locker.Lock lock= _locker.lock())
518 {
519 if (_async!=Async.STARTED)
520 return;
521 _async=Async.EXPIRING;
522 event=_event;
523 listeners=_asyncListeners;
524
525 }
526
527 if (LOG.isDebugEnabled())
528 LOG.debug("Async timeout {}",this);
529
530 if (listeners!=null)
531 {
532 Runnable callback=new Runnable()
533 {
534 @Override
535 public void run()
536 {
537 for (AsyncListener listener : listeners)
538 {
539 try
540 {
541 listener.onTimeout(event);
542 }
543 catch(Throwable e)
544 {
545 LOG.debug(e);
546 event.addThrowable(e);
547 _channel.getRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION,event.getThrowable());
548 break;
549 }
550 }
551 }
552 @Override
553 public String toString()
554 {
555 return "onTimeout";
556 }
557 };
558
559 runInContext(event,callback);
560 }
561
562 boolean dispatch=false;
563 try(Locker.Lock lock= _locker.lock())
564 {
565 switch(_async)
566 {
567 case EXPIRING:
568 if (event.getThrowable()==null)
569 {
570 _async=Async.EXPIRED;
571 _event.addThrowable(new TimeoutException("Async API violation"));
572 }
573 else
574 {
575 _async=Async.ERRORING;
576 }
577 break;
578
579 case COMPLETE:
580 case DISPATCH:
581 break;
582
583 default:
584 throw new IllegalStateException();
585 }
586
587 if (_state==State.ASYNC_WAIT)
588 {
589 _state=State.ASYNC_WOKEN;
590 dispatch=true;
591 }
592 }
593
594 if (dispatch)
595 {
596 if (LOG.isDebugEnabled())
597 LOG.debug("Dispatch after async timeout {}",this);
598 scheduleDispatch();
599 }
600 }
601
602 public void complete()
603 {
604
605 boolean handle=false;
606 AsyncContextEvent event=null;
607 try(Locker.Lock lock= _locker.lock())
608 {
609 boolean started=false;
610 event=_event;
611
612 switch(_async)
613 {
614 case STARTED:
615 started=true;
616 break;
617 case EXPIRING:
618 case ERRORED:
619 break;
620 default:
621 throw new IllegalStateException(this.getStatusStringLocked());
622 }
623 _async=Async.COMPLETE;
624
625 if (started && _state==State.ASYNC_WAIT)
626 {
627 handle=true;
628 _state=State.ASYNC_WOKEN;
629 }
630 }
631
632 cancelTimeout(event);
633 if (handle)
634 runInContext(event,_channel);
635 }
636
637 public void errorComplete()
638 {
639 try(Locker.Lock lock= _locker.lock())
640 {
641 _async=Async.COMPLETE;
642 _event.setDispatchContext(null);
643 _event.setDispatchPath(null);
644 }
645
646 cancelTimeout();
647 }
648
649 protected void onError()
650 {
651 final List<AsyncListener> aListeners;
652 final AsyncContextEvent event;
653
654 try(Locker.Lock lock= _locker.lock())
655 {
656 if (_state!=State.DISPATCHED
657 throw new IllegalStateException(this.getStatusStringLocked());
658
659 aListeners=_asyncListeners;
660 event=_event;
661 _async=Async.ERRORED;
662 }
663
664 if (event!=null && aListeners!=null)
665 {
666 event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION,event.getThrowable());
667 event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_MESSAGE,event.getThrowable().getMessage());
668 for (AsyncListener listener : aListeners)
669 {
670 try
671 {
672 listener.onError(event);
673 }
674 catch(Throwable x)
675 {
676 LOG.info("Exception while invoking listener " + listener, x);
677 }
678 }
679 }
680 }
681
682
683 protected void onComplete()
684 {
685 final List<AsyncListener> aListeners;
686 final AsyncContextEvent event;
687
688 try(Locker.Lock lock= _locker.lock())
689 {
690 switch(_state)
691 {
692 case COMPLETING:
693 aListeners=_asyncListeners;
694 event=_event;
695 _state=State.COMPLETED;
696 _async=Async.NOT_ASYNC;
697 break;
698
699 default:
700 throw new IllegalStateException(this.getStatusStringLocked());
701 }
702 }
703
704 if (event!=null)
705 {
706 if (aListeners!=null)
707 {
708 Runnable callback = new Runnable()
709 {
710 @Override
711 public void run()
712 {
713 for (AsyncListener listener : aListeners)
714 {
715 try
716 {
717 listener.onComplete(event);
718 }
719 catch(Throwable e)
720 {
721 LOG.warn(e);
722 }
723 }
724 }
725 @Override
726 public String toString()
727 {
728 return "onComplete";
729 }
730 };
731
732 runInContext(event,callback);
733 }
734 event.completed();
735 }
736 }
737
738 protected void recycle()
739 {
740 cancelTimeout();
741 try(Locker.Lock lock= _locker.lock())
742 {
743 switch(_state)
744 {
745 case DISPATCHED:
746 case ASYNC_IO:
747 throw new IllegalStateException(getStatusStringLocked());
748 case UPGRADED:
749 return;
750 default:
751 break;
752 }
753 _asyncListeners=null;
754 _state=State.IDLE;
755 _async=Async.NOT_ASYNC;
756 _initial=true;
757 _asyncReadPossible=_asyncReadUnready=false;
758 _asyncWrite=false;
759 _timeoutMs=DEFAULT_TIMEOUT;
760 _event=null;
761 }
762 }
763
764 public void upgrade()
765 {
766 cancelTimeout();
767 try(Locker.Lock lock= _locker.lock())
768 {
769 switch(_state)
770 {
771 case IDLE:
772 case COMPLETED:
773 break;
774 default:
775 throw new IllegalStateException(getStatusStringLocked());
776 }
777 _asyncListeners=null;
778 _state=State.UPGRADED;
779 _async=Async.NOT_ASYNC;
780 _initial=true;
781 _asyncReadPossible=_asyncReadUnready=false;
782 _asyncWrite=false;
783 _timeoutMs=DEFAULT_TIMEOUT;
784 _event=null;
785 }
786 }
787
788 protected void scheduleDispatch()
789 {
790 _channel.execute(_channel);
791 }
792
793 protected void cancelTimeout()
794 {
795 final AsyncContextEvent event;
796 try(Locker.Lock lock= _locker.lock())
797 {
798 event=_event;
799 }
800 cancelTimeout(event);
801 }
802
803 protected void cancelTimeout(AsyncContextEvent event)
804 {
805 if (event!=null)
806 event.cancelTimeoutTask();
807 }
808
809 public boolean isIdle()
810 {
811 try(Locker.Lock lock= _locker.lock())
812 {
813 return _state==State.IDLE;
814 }
815 }
816
817 public boolean isExpired()
818 {
819 try(Locker.Lock lock= _locker.lock())
820 {
821 return _async==Async.EXPIRED;
822 }
823 }
824
825 public boolean isInitial()
826 {
827 try(Locker.Lock lock= _locker.lock())
828 {
829 return _initial;
830 }
831 }
832
833 public boolean isSuspended()
834 {
835 try(Locker.Lock lock= _locker.lock())
836 {
837 return _state==State.ASYNC_WAIT || _state==State.DISPATCHED && _async==Async.STARTED;
838 }
839 }
840
841 boolean isCompleting()
842 {
843 try(Locker.Lock lock= _locker.lock())
844 {
845 return _state==State.COMPLETING;
846 }
847 }
848
849 boolean isCompleted()
850 {
851 try(Locker.Lock lock= _locker.lock())
852 {
853 return _state == State.COMPLETED;
854 }
855 }
856
857 public boolean isAsyncStarted()
858 {
859 try(Locker.Lock lock= _locker.lock())
860 {
861 if (_state==State.DISPATCHED)
862 return _async!=Async.NOT_ASYNC;
863 return _async==Async.STARTED || _async==Async.EXPIRING;
864 }
865 }
866
867 public boolean isAsync()
868 {
869 try(Locker.Lock lock= _locker.lock())
870 {
871 return !_initial || _async!=Async.NOT_ASYNC;
872 }
873 }
874
875 public Request getBaseRequest()
876 {
877 return _channel.getRequest();
878 }
879
880 public HttpChannel getHttpChannel()
881 {
882 return _channel;
883 }
884
885 public ContextHandler getContextHandler()
886 {
887 final AsyncContextEvent event;
888 try(Locker.Lock lock= _locker.lock())
889 {
890 event=_event;
891 }
892 return getContextHandler(event);
893 }
894
895 ContextHandler getContextHandler(AsyncContextEvent event)
896 {
897 if (event!=null)
898 {
899 Context context=((Context)event.getServletContext());
900 if (context!=null)
901 return context.getContextHandler();
902 }
903 return null;
904 }
905
906 public ServletResponse getServletResponse()
907 {
908 final AsyncContextEvent event;
909 try(Locker.Lock lock= _locker.lock())
910 {
911 event=_event;
912 }
913 return getServletResponse(event);
914 }
915
916 public ServletResponse getServletResponse(AsyncContextEvent event)
917 {
918 if (event!=null && event.getSuppliedResponse()!=null)
919 return event.getSuppliedResponse();
920 return _channel.getResponse();
921 }
922
923 void runInContext(AsyncContextEvent event,Runnable runnable)
924 {
925 ContextHandler contextHandler = getContextHandler(event);
926 if (contextHandler==null)
927 runnable.run();
928 else
929 contextHandler.handle(_channel.getRequest(),runnable);
930 }
931
932 public Object getAttribute(String name)
933 {
934 return _channel.getRequest().getAttribute(name);
935 }
936
937 public void removeAttribute(String name)
938 {
939 _channel.getRequest().removeAttribute(name);
940 }
941
942 public void setAttribute(String name, Object attribute)
943 {
944 _channel.getRequest().setAttribute(name,attribute);
945 }
946
947
948
949
950
951
952
953
954
955 public void onReadUnready()
956 {
957 boolean interested=false;
958 try(Locker.Lock lock= _locker.lock())
959 {
960
961 if (!_asyncReadUnready)
962 {
963 _asyncReadUnready=true;
964 _asyncReadPossible=false;
965 if (_state==State.ASYNC_WAIT)
966 interested=true;
967 }
968 }
969
970 if (interested)
971 _channel.asyncReadFillInterested();
972 }
973
974
975
976
977
978
979
980
981 public boolean onReadPossible()
982 {
983 boolean woken=false;
984 try(Locker.Lock lock= _locker.lock())
985 {
986 _asyncReadPossible=true;
987 if (_state==State.ASYNC_WAIT && _asyncReadUnready)
988 {
989 woken=true;
990 _state=State.ASYNC_WOKEN;
991 }
992 }
993 return woken;
994 }
995
996
997
998
999
1000
1001
1002
1003 public boolean onReadReady()
1004 {
1005 boolean woken=false;
1006 try(Locker.Lock lock= _locker.lock())
1007 {
1008 _asyncReadUnready=true;
1009 _asyncReadPossible=true;
1010 if (_state==State.ASYNC_WAIT)
1011 {
1012 woken=true;
1013 _state=State.ASYNC_WOKEN;
1014 }
1015 }
1016 return woken;
1017 }
1018
1019 public boolean isReadPossible()
1020 {
1021 try(Locker.Lock lock= _locker.lock())
1022 {
1023 return _asyncReadPossible;
1024 }
1025 }
1026
1027 public boolean onWritePossible()
1028 {
1029 boolean handle=false;
1030
1031 try(Locker.Lock lock= _locker.lock())
1032 {
1033 _asyncWrite=true;
1034 if (_state==State.ASYNC_WAIT)
1035 {
1036 _state=State.ASYNC_WOKEN;
1037 handle=true;
1038 }
1039 }
1040
1041 return handle;
1042 }
1043
1044 }