1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.http2;
20
21 import java.io.IOException;
22 import java.nio.channels.ClosedChannelException;
23 import java.nio.charset.StandardCharsets;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.Collections;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ConcurrentMap;
31 import java.util.concurrent.TimeoutException;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import java.util.concurrent.atomic.AtomicReference;
34
35 import org.eclipse.jetty.http2.api.Session;
36 import org.eclipse.jetty.http2.api.Stream;
37 import org.eclipse.jetty.http2.frames.DataFrame;
38 import org.eclipse.jetty.http2.frames.DisconnectFrame;
39 import org.eclipse.jetty.http2.frames.Frame;
40 import org.eclipse.jetty.http2.frames.FrameType;
41 import org.eclipse.jetty.http2.frames.GoAwayFrame;
42 import org.eclipse.jetty.http2.frames.HeadersFrame;
43 import org.eclipse.jetty.http2.frames.PingFrame;
44 import org.eclipse.jetty.http2.frames.PriorityFrame;
45 import org.eclipse.jetty.http2.frames.PushPromiseFrame;
46 import org.eclipse.jetty.http2.frames.ResetFrame;
47 import org.eclipse.jetty.http2.frames.SettingsFrame;
48 import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
49 import org.eclipse.jetty.http2.generator.Generator;
50 import org.eclipse.jetty.http2.parser.Parser;
51 import org.eclipse.jetty.io.ByteBufferPool;
52 import org.eclipse.jetty.io.EndPoint;
53 import org.eclipse.jetty.util.Atomics;
54 import org.eclipse.jetty.util.Callback;
55 import org.eclipse.jetty.util.CountingCallback;
56 import org.eclipse.jetty.util.Promise;
57 import org.eclipse.jetty.util.annotation.ManagedAttribute;
58 import org.eclipse.jetty.util.annotation.ManagedObject;
59 import org.eclipse.jetty.util.component.ContainerLifeCycle;
60 import org.eclipse.jetty.util.log.Log;
61 import org.eclipse.jetty.util.log.Logger;
62 import org.eclipse.jetty.util.thread.Scheduler;
63
64 @ManagedObject
65 public abstract class HTTP2Session extends ContainerLifeCycle implements ISession, Parser.Listener
66 {
67 private static final Logger LOG = Log.getLogger(HTTP2Session.class);
68
69 private final ConcurrentMap<Integer, IStream> streams = new ConcurrentHashMap<>();
70 private final AtomicInteger streamIds = new AtomicInteger();
71 private final AtomicInteger lastStreamId = new AtomicInteger();
72 private final AtomicInteger localStreamCount = new AtomicInteger();
73 private final AtomicInteger remoteStreamCount = new AtomicInteger();
74 private final AtomicInteger sendWindow = new AtomicInteger();
75 private final AtomicInteger recvWindow = new AtomicInteger();
76 private final AtomicReference<CloseState> closed = new AtomicReference<>(CloseState.NOT_CLOSED);
77 private final Scheduler scheduler;
78 private final EndPoint endPoint;
79 private final Generator generator;
80 private final Session.Listener listener;
81 private final FlowControlStrategy flowControl;
82 private final HTTP2Flusher flusher;
83 private int maxLocalStreams;
84 private int maxRemoteStreams;
85 private long streamIdleTimeout;
86 private boolean pushEnabled;
87
88 public HTTP2Session(Scheduler scheduler, EndPoint endPoint, Generator generator, Session.Listener listener, FlowControlStrategy flowControl, int initialStreamId)
89 {
90 this.scheduler = scheduler;
91 this.endPoint = endPoint;
92 this.generator = generator;
93 this.listener = listener;
94 this.flowControl = flowControl;
95 this.flusher = new HTTP2Flusher(this);
96 this.maxLocalStreams = -1;
97 this.maxRemoteStreams = -1;
98 this.streamIds.set(initialStreamId);
99 this.streamIdleTimeout = endPoint.getIdleTimeout();
100 this.sendWindow.set(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
101 this.recvWindow.set(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
102 this.pushEnabled = true;
103 }
104
105 @Override
106 protected void doStart() throws Exception
107 {
108 addBean(flowControl);
109 super.doStart();
110 }
111
112 @ManagedAttribute(value = "The flow control strategy", readonly = true)
113 public FlowControlStrategy getFlowControlStrategy()
114 {
115 return flowControl;
116 }
117
118 public int getMaxLocalStreams()
119 {
120 return maxLocalStreams;
121 }
122
123 public void setMaxLocalStreams(int maxLocalStreams)
124 {
125 this.maxLocalStreams = maxLocalStreams;
126 }
127
128 public int getMaxRemoteStreams()
129 {
130 return maxRemoteStreams;
131 }
132
133 public void setMaxRemoteStreams(int maxRemoteStreams)
134 {
135 this.maxRemoteStreams = maxRemoteStreams;
136 }
137
138 @ManagedAttribute("The stream's idle timeout")
139 public long getStreamIdleTimeout()
140 {
141 return streamIdleTimeout;
142 }
143
144 public void setStreamIdleTimeout(long streamIdleTimeout)
145 {
146 this.streamIdleTimeout = streamIdleTimeout;
147 }
148
149 public EndPoint getEndPoint()
150 {
151 return endPoint;
152 }
153
154 public Generator getGenerator()
155 {
156 return generator;
157 }
158
159 @Override
160 public void onData(final DataFrame frame)
161 {
162 if (LOG.isDebugEnabled())
163 LOG.debug("Received {}", frame);
164
165 int streamId = frame.getStreamId();
166 final IStream stream = getStream(streamId);
167
168
169
170 final int flowControlLength = frame.remaining() + frame.padding();
171 flowControl.onDataReceived(this, stream, flowControlLength);
172
173 if (stream != null)
174 {
175 if (getRecvWindow() < 0)
176 {
177 close(ErrorCode.FLOW_CONTROL_ERROR.code, "session_window_exceeded", Callback.NOOP);
178 }
179 else
180 {
181 stream.process(frame, new Callback()
182 {
183 @Override
184 public void succeeded()
185 {
186 flowControl.onDataConsumed(HTTP2Session.this, stream, flowControlLength);
187 }
188
189 @Override
190 public void failed(Throwable x)
191 {
192
193
194 flowControl.onDataConsumed(HTTP2Session.this, stream, flowControlLength);
195 }
196 });
197 }
198 }
199 else
200 {
201 if (LOG.isDebugEnabled())
202 LOG.debug("Ignoring {}, stream #{} not found", frame, streamId);
203
204
205 flowControl.onDataConsumed(this, null, flowControlLength);
206 }
207 }
208
209 @Override
210 public abstract void onHeaders(HeadersFrame frame);
211
212 @Override
213 public void onPriority(PriorityFrame frame)
214 {
215 if (LOG.isDebugEnabled())
216 LOG.debug("Received {}", frame);
217 }
218
219 @Override
220 public void onReset(ResetFrame frame)
221 {
222 if (LOG.isDebugEnabled())
223 LOG.debug("Received {}", frame);
224
225 IStream stream = getStream(frame.getStreamId());
226 if (stream != null)
227 stream.process(frame, Callback.NOOP);
228 else
229 notifyReset(this, frame);
230 }
231
232 @Override
233 public void onSettings(SettingsFrame frame)
234 {
235
236 onSettings(frame, true);
237 }
238
239 public void onSettings(SettingsFrame frame, boolean reply)
240 {
241 if (LOG.isDebugEnabled())
242 LOG.debug("Received {}", frame);
243
244 if (frame.isReply())
245 return;
246
247
248 for (Map.Entry<Integer, Integer> entry : frame.getSettings().entrySet())
249 {
250 int key = entry.getKey();
251 int value = entry.getValue();
252 switch (key)
253 {
254 case SettingsFrame.HEADER_TABLE_SIZE:
255 {
256 if (LOG.isDebugEnabled())
257 LOG.debug("Update HPACK header table size to {}", value);
258 generator.setHeaderTableSize(value);
259 break;
260 }
261 case SettingsFrame.ENABLE_PUSH:
262 {
263
264 if (value != 0 && value != 1)
265 {
266 onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_settings_enable_push");
267 return;
268 }
269 pushEnabled = value == 1;
270 break;
271 }
272 case SettingsFrame.MAX_CONCURRENT_STREAMS:
273 {
274 maxLocalStreams = value;
275 if (LOG.isDebugEnabled())
276 LOG.debug("Update max local concurrent streams to {}", maxLocalStreams);
277 break;
278 }
279 case SettingsFrame.INITIAL_WINDOW_SIZE:
280 {
281 if (LOG.isDebugEnabled())
282 LOG.debug("Update initial window size to {}", value);
283 flowControl.updateInitialStreamWindow(this, value, false);
284 break;
285 }
286 case SettingsFrame.MAX_FRAME_SIZE:
287 {
288 if (LOG.isDebugEnabled())
289 LOG.debug("Update max frame size to {}", value);
290
291 if (value < Frame.DEFAULT_MAX_LENGTH || value > Frame.MAX_MAX_LENGTH)
292 {
293 onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_settings_max_frame_size");
294 return;
295 }
296 generator.setMaxFrameSize(value);
297 break;
298 }
299 case SettingsFrame.MAX_HEADER_LIST_SIZE:
300 {
301
302 LOG.warn("NOT IMPLEMENTED max header list size to {}", value);
303 break;
304 }
305 default:
306 {
307 LOG.debug("Unknown setting {}:{}", key, value);
308 break;
309 }
310 }
311 }
312 notifySettings(this, frame);
313
314 if (reply)
315 {
316 SettingsFrame replyFrame = new SettingsFrame(Collections.<Integer, Integer>emptyMap(), true);
317 settings(replyFrame, Callback.NOOP);
318 }
319 }
320
321 @Override
322 public void onPing(PingFrame frame)
323 {
324 if (LOG.isDebugEnabled())
325 LOG.debug("Received {}", frame);
326
327 if (frame.isReply())
328 {
329 notifyPing(this, frame);
330 }
331 else
332 {
333 PingFrame reply = new PingFrame(frame.getPayload(), true);
334 control(null, Callback.NOOP, reply);
335 }
336 }
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355 @Override
356 public void onGoAway(final GoAwayFrame frame)
357 {
358 if (LOG.isDebugEnabled())
359 LOG.debug("Received {}", frame);
360
361 while (true)
362 {
363 CloseState current = closed.get();
364 switch (current)
365 {
366 case NOT_CLOSED:
367 {
368 if (closed.compareAndSet(current, CloseState.REMOTELY_CLOSED))
369 {
370
371
372 control(null, new Callback()
373 {
374 @Override
375 public void succeeded()
376 {
377 notifyClose(HTTP2Session.this, frame);
378 }
379
380 @Override
381 public void failed(Throwable x)
382 {
383 notifyClose(HTTP2Session.this, frame);
384 }
385 }, new DisconnectFrame());
386 return;
387 }
388 break;
389 }
390 default:
391 {
392 if (LOG.isDebugEnabled())
393 LOG.debug("Ignored {}, already closed", frame);
394 return;
395 }
396 }
397 }
398 }
399
400 @Override
401 public void onWindowUpdate(WindowUpdateFrame frame)
402 {
403 if (LOG.isDebugEnabled())
404 LOG.debug("Received {}", frame);
405
406 int streamId = frame.getStreamId();
407 if (streamId > 0)
408 {
409 IStream stream = getStream(streamId);
410 if (stream != null)
411 onWindowUpdate(stream, frame);
412 }
413 else
414 {
415 onWindowUpdate(null, frame);
416 }
417 }
418
419 @Override
420 public void onConnectionFailure(int error, String reason)
421 {
422 close(error, reason, Callback.NOOP);
423 notifyFailure(this, new IOException(String.format("%d/%s", error, reason)));
424 }
425
426 @Override
427 public void newStream(HeadersFrame frame, Promise<Stream> promise, Stream.Listener listener)
428 {
429
430
431 boolean queued;
432 synchronized (this)
433 {
434 int streamId = frame.getStreamId();
435 if (streamId <= 0)
436 {
437 streamId = streamIds.getAndAdd(2);
438 PriorityFrame priority = frame.getPriority();
439 priority = priority == null ? null : new PriorityFrame(streamId, priority.getParentStreamId(),
440 priority.getWeight(), priority.isExclusive());
441 frame = new HeadersFrame(streamId, frame.getMetaData(), priority, frame.isEndStream());
442 }
443 final IStream stream = createLocalStream(streamId, promise);
444 if (stream == null)
445 return;
446 stream.setListener(listener);
447
448 ControlEntry entry = new ControlEntry(frame, stream, new PromiseCallback<>(promise, stream));
449 queued = flusher.append(entry);
450 }
451
452 if (queued)
453 flusher.iterate();
454 }
455
456 @Override
457 public int priority(PriorityFrame frame, Callback callback)
458 {
459 int streamId = frame.getStreamId();
460 IStream stream = streams.get(streamId);
461 if (stream == null)
462 {
463 streamId = streamIds.getAndAdd(2);
464 frame = new PriorityFrame(streamId, frame.getParentStreamId(),
465 frame.getWeight(), frame.isExclusive());
466 }
467 control(stream, callback, frame);
468 return streamId;
469 }
470
471 @Override
472 public void push(IStream stream, Promise<Stream> promise, PushPromiseFrame frame, Stream.Listener listener)
473 {
474
475
476 boolean queued;
477 synchronized (this)
478 {
479 int streamId = streamIds.getAndAdd(2);
480 frame = new PushPromiseFrame(frame.getStreamId(), streamId, frame.getMetaData());
481
482 final IStream pushStream = createLocalStream(streamId, promise);
483 if (pushStream == null)
484 return;
485 pushStream.setListener(listener);
486
487 ControlEntry entry = new ControlEntry(frame, pushStream, new PromiseCallback<>(promise, pushStream));
488 queued = flusher.append(entry);
489 }
490
491 if (queued)
492 flusher.iterate();
493 }
494
495
496 @Override
497 public void settings(SettingsFrame frame, Callback callback)
498 {
499 control(null, callback, frame);
500 }
501
502 @Override
503 public void ping(PingFrame frame, Callback callback)
504 {
505 if (frame.isReply())
506 callback.failed(new IllegalArgumentException());
507 else
508 control(null, callback, frame);
509 }
510
511 protected void reset(ResetFrame frame, Callback callback)
512 {
513 control(getStream(frame.getStreamId()), callback, frame);
514 }
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539 @Override
540 public boolean close(int error, String reason, Callback callback)
541 {
542 while (true)
543 {
544 CloseState current = closed.get();
545 switch (current)
546 {
547 case NOT_CLOSED:
548 {
549 if (closed.compareAndSet(current, CloseState.LOCALLY_CLOSED))
550 {
551 byte[] payload = null;
552 if (reason != null)
553 {
554
555 reason = reason.substring(0, Math.min(reason.length(), 32));
556 payload = reason.getBytes(StandardCharsets.UTF_8);
557 }
558 GoAwayFrame frame = new GoAwayFrame(lastStreamId.get(), error, payload);
559 control(null, callback, frame);
560 return true;
561 }
562 break;
563 }
564 default:
565 {
566 if (LOG.isDebugEnabled())
567 LOG.debug("Ignoring close {}/{}, already closed", error, reason);
568 callback.succeeded();
569 return false;
570 }
571 }
572 }
573 }
574
575 @Override
576 public boolean isClosed()
577 {
578 return closed.get() != CloseState.NOT_CLOSED;
579 }
580
581 private void control(IStream stream, Callback callback, Frame frame)
582 {
583 frames(stream, callback, frame, Frame.EMPTY_ARRAY);
584 }
585
586 @Override
587 public void frames(IStream stream, Callback callback, Frame frame, Frame... frames)
588 {
589
590
591
592
593
594 int length = frames.length;
595 if (length == 0)
596 {
597 frame(new ControlEntry(frame, stream, callback), true);
598 }
599 else
600 {
601 callback = new CountingCallback(callback, 1 + length);
602 frame(new ControlEntry(frame, stream, callback), false);
603 for (int i = 1; i <= length; ++i)
604 frame(new ControlEntry(frames[i - 1], stream, callback), i == length);
605 }
606 }
607
608 @Override
609 public void data(IStream stream, Callback callback, DataFrame frame)
610 {
611
612 frame(new DataEntry(frame, stream, callback), true);
613 }
614
615 private void frame(HTTP2Flusher.Entry entry, boolean flush)
616 {
617 if (LOG.isDebugEnabled())
618 LOG.debug("Sending {}", entry.frame);
619
620 boolean queued = entry.frame.getType() == FrameType.PING ? flusher.prepend(entry) : flusher.append(entry);
621 if (queued && flush)
622 flusher.iterate();
623 }
624
625 protected IStream createLocalStream(int streamId, Promise<Stream> promise)
626 {
627 while (true)
628 {
629 int localCount = localStreamCount.get();
630 int maxCount = maxLocalStreams;
631 if (maxCount >= 0 && localCount >= maxCount)
632 {
633 promise.failed(new IllegalStateException("Max local stream count " + maxCount + " exceeded"));
634 return null;
635 }
636 if (localStreamCount.compareAndSet(localCount, localCount + 1))
637 break;
638 }
639
640 IStream stream = newStream(streamId, true);
641 if (streams.putIfAbsent(streamId, stream) == null)
642 {
643 stream.setIdleTimeout(getStreamIdleTimeout());
644 flowControl.onStreamCreated(stream);
645 if (LOG.isDebugEnabled())
646 LOG.debug("Created local {}", stream);
647 return stream;
648 }
649 else
650 {
651 promise.failed(new IllegalStateException("Duplicate stream " + streamId));
652 return null;
653 }
654 }
655
656 protected IStream createRemoteStream(int streamId)
657 {
658
659 while (true)
660 {
661 int remoteCount = remoteStreamCount.get();
662 int maxCount = getMaxRemoteStreams();
663 if (maxCount >= 0 && remoteCount >= maxCount)
664 {
665 reset(new ResetFrame(streamId, ErrorCode.REFUSED_STREAM_ERROR.code), Callback.NOOP);
666 return null;
667 }
668 if (remoteStreamCount.compareAndSet(remoteCount, remoteCount + 1))
669 break;
670 }
671
672 IStream stream = newStream(streamId, false);
673
674
675 if (streams.putIfAbsent(streamId, stream) == null)
676 {
677 updateLastStreamId(streamId);
678 stream.setIdleTimeout(getStreamIdleTimeout());
679 flowControl.onStreamCreated(stream);
680 if (LOG.isDebugEnabled())
681 LOG.debug("Created remote {}", stream);
682 return stream;
683 }
684 else
685 {
686 close(ErrorCode.PROTOCOL_ERROR.code, "duplicate_stream", Callback.NOOP);
687 return null;
688 }
689 }
690
691 protected IStream newStream(int streamId, boolean local)
692 {
693 return new HTTP2Stream(scheduler, this, streamId, local);
694 }
695
696 @Override
697 public void removeStream(IStream stream)
698 {
699 IStream removed = streams.remove(stream.getId());
700 if (removed != null)
701 {
702 assert removed == stream;
703
704 boolean local = stream.isLocal();
705 if (local)
706 localStreamCount.decrementAndGet();
707 else
708 remoteStreamCount.decrementAndGet();
709
710 flowControl.onStreamDestroyed(stream);
711
712 if (LOG.isDebugEnabled())
713 LOG.debug("Removed {} {}", local ? "local" : "remote", stream);
714 }
715 }
716
717 @Override
718 public Collection<Stream> getStreams()
719 {
720 List<Stream> result = new ArrayList<>();
721 result.addAll(streams.values());
722 return result;
723 }
724
725 @ManagedAttribute("The number of active streams")
726 public int getStreamCount()
727 {
728 return streams.size();
729 }
730
731 @Override
732 public IStream getStream(int streamId)
733 {
734 return streams.get(streamId);
735 }
736
737 @ManagedAttribute(value = "The flow control send window", readonly = true)
738 public int getSendWindow()
739 {
740 return sendWindow.get();
741 }
742
743 @ManagedAttribute(value = "The flow control receive window", readonly = true)
744 public int getRecvWindow()
745 {
746 return recvWindow.get();
747 }
748
749 @Override
750 public int updateSendWindow(int delta)
751 {
752 return sendWindow.getAndAdd(delta);
753 }
754
755 @Override
756 public int updateRecvWindow(int delta)
757 {
758 return recvWindow.getAndAdd(delta);
759 }
760
761 @Override
762 public void onWindowUpdate(IStream stream, WindowUpdateFrame frame)
763 {
764
765
766
767
768
769
770
771
772
773 flusher.window(stream, frame);
774 }
775
776 @Override
777 @ManagedAttribute(value = "Whether HTTP/2 push is enabled", readonly = true)
778 public boolean isPushEnabled()
779 {
780 return pushEnabled;
781 }
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805 @Override
806 public void onShutdown()
807 {
808 if (LOG.isDebugEnabled())
809 LOG.debug("Shutting down {}", this);
810
811 switch (closed.get())
812 {
813 case NOT_CLOSED:
814 {
815
816 if (LOG.isDebugEnabled())
817 LOG.debug("Abrupt close for {}", this);
818 abort(new ClosedChannelException());
819 break;
820 }
821 case LOCALLY_CLOSED:
822 {
823
824
825 control(null, Callback.NOOP, new DisconnectFrame());
826 break;
827 }
828 case REMOTELY_CLOSED:
829 {
830
831
832 break;
833 }
834 default:
835 {
836 break;
837 }
838 }
839 }
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862 @Override
863 public boolean onIdleTimeout()
864 {
865 switch (closed.get())
866 {
867 case NOT_CLOSED:
868 {
869 return notifyIdleTimeout(this);
870 }
871 case LOCALLY_CLOSED:
872 case REMOTELY_CLOSED:
873 {
874 abort(new TimeoutException());
875 return false;
876 }
877 default:
878 {
879 return false;
880 }
881 }
882 }
883
884 @Override
885 public void onFrame(Frame frame)
886 {
887 onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "upgrade");
888 }
889
890 public void disconnect()
891 {
892 if (LOG.isDebugEnabled())
893 LOG.debug("Disconnecting {}", this);
894 endPoint.close();
895 }
896
897 private void terminate()
898 {
899 while (true)
900 {
901 CloseState current = closed.get();
902 switch (current)
903 {
904 case NOT_CLOSED:
905 case LOCALLY_CLOSED:
906 case REMOTELY_CLOSED:
907 {
908 if (closed.compareAndSet(current, CloseState.CLOSED))
909 {
910 flusher.close();
911 for (IStream stream : streams.values())
912 stream.close();
913 streams.clear();
914 disconnect();
915 return;
916 }
917 break;
918 }
919 default:
920 {
921 return;
922 }
923 }
924 }
925 }
926
927 protected void abort(Throwable failure)
928 {
929 terminate();
930 notifyFailure(this, failure);
931 }
932
933 public boolean isDisconnected()
934 {
935 return !endPoint.isOpen();
936 }
937
938 private void updateLastStreamId(int streamId)
939 {
940 Atomics.updateMax(lastStreamId, streamId);
941 }
942
943 protected Stream.Listener notifyNewStream(Stream stream, HeadersFrame frame)
944 {
945 try
946 {
947 return listener.onNewStream(stream, frame);
948 }
949 catch (Throwable x)
950 {
951 LOG.info("Failure while notifying listener " + listener, x);
952 return null;
953 }
954 }
955
956 protected void notifySettings(Session session, SettingsFrame frame)
957 {
958 try
959 {
960 listener.onSettings(session, frame);
961 }
962 catch (Throwable x)
963 {
964 LOG.info("Failure while notifying listener " + listener, x);
965 }
966 }
967
968 protected void notifyPing(Session session, PingFrame frame)
969 {
970 try
971 {
972 listener.onPing(session, frame);
973 }
974 catch (Throwable x)
975 {
976 LOG.info("Failure while notifying listener " + listener, x);
977 }
978 }
979
980 protected void notifyReset(Session session, ResetFrame frame)
981 {
982 try
983 {
984 listener.onReset(session, frame);
985 }
986 catch (Throwable x)
987 {
988 LOG.info("Failure while notifying listener " + listener, x);
989 }
990 }
991
992 protected void notifyClose(Session session, GoAwayFrame frame)
993 {
994 try
995 {
996 listener.onClose(session, frame);
997 }
998 catch (Throwable x)
999 {
1000 LOG.info("Failure while notifying listener " + listener, x);
1001 }
1002 }
1003
1004 protected boolean notifyIdleTimeout(Session session)
1005 {
1006 try
1007 {
1008 return listener.onIdleTimeout(session);
1009 }
1010 catch (Throwable x)
1011 {
1012 LOG.info("Failure while notifying listener " + listener, x);
1013 return true;
1014 }
1015 }
1016
1017 protected void notifyFailure(Session session, Throwable failure)
1018 {
1019 try
1020 {
1021 listener.onFailure(session, failure);
1022 }
1023 catch (Throwable x)
1024 {
1025 LOG.info("Failure while notifying listener " + listener, x);
1026 }
1027 }
1028
1029 @Override
1030 public String toString()
1031 {
1032 return String.format("%s@%x{l:%s <-> r:%s,queueSize=%d,sendWindow=%s,recvWindow=%s,streams=%d,%s}",
1033 getClass().getSimpleName(),
1034 hashCode(),
1035 getEndPoint().getLocalAddress(),
1036 getEndPoint().getRemoteAddress(),
1037 flusher.getQueueSize(),
1038 sendWindow,
1039 recvWindow,
1040 streams.size(),
1041 closed);
1042 }
1043
1044 private class ControlEntry extends HTTP2Flusher.Entry
1045 {
1046 private ControlEntry(Frame frame, IStream stream, Callback callback)
1047 {
1048 super(frame, stream, callback);
1049 }
1050
1051 public Throwable generate(ByteBufferPool.Lease lease)
1052 {
1053 try
1054 {
1055 generator.control(lease, frame);
1056 if (LOG.isDebugEnabled())
1057 LOG.debug("Generated {}", frame);
1058 prepare();
1059 return null;
1060 }
1061 catch (Throwable x)
1062 {
1063 if (LOG.isDebugEnabled())
1064 LOG.debug("Failure generating frame " + frame, x);
1065 return x;
1066 }
1067 }
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082 private void prepare()
1083 {
1084 switch (frame.getType())
1085 {
1086 case SETTINGS:
1087 {
1088 SettingsFrame settingsFrame = (SettingsFrame)frame;
1089 Integer initialWindow = settingsFrame.getSettings().get(SettingsFrame.INITIAL_WINDOW_SIZE);
1090 if (initialWindow != null)
1091 flowControl.updateInitialStreamWindow(HTTP2Session.this, initialWindow, true);
1092 break;
1093 }
1094 default:
1095 {
1096 break;
1097 }
1098 }
1099 }
1100
1101 @Override
1102 public void succeeded()
1103 {
1104 switch (frame.getType())
1105 {
1106 case HEADERS:
1107 {
1108 HeadersFrame headersFrame = (HeadersFrame)frame;
1109 if (stream.updateClose(headersFrame.isEndStream(), true))
1110 removeStream(stream);
1111 break;
1112 }
1113 case RST_STREAM:
1114 {
1115 if (stream != null)
1116 {
1117 stream.close();
1118 removeStream(stream);
1119 }
1120 break;
1121 }
1122 case PUSH_PROMISE:
1123 {
1124
1125
1126 stream.updateClose(true, false);
1127 break;
1128 }
1129 case GO_AWAY:
1130 {
1131
1132
1133 getEndPoint().shutdownOutput();
1134 break;
1135 }
1136 case WINDOW_UPDATE:
1137 {
1138 flowControl.windowUpdate(HTTP2Session.this, stream, (WindowUpdateFrame)frame);
1139 break;
1140 }
1141 case DISCONNECT:
1142 {
1143 terminate();
1144 break;
1145 }
1146 default:
1147 {
1148 break;
1149 }
1150 }
1151 callback.succeeded();
1152 }
1153 }
1154
1155 private class DataEntry extends HTTP2Flusher.Entry
1156 {
1157 private int length;
1158
1159 private DataEntry(DataFrame frame, IStream stream, Callback callback)
1160 {
1161 super(frame, stream, callback);
1162 }
1163
1164 @Override
1165 public int dataRemaining()
1166 {
1167
1168
1169
1170
1171
1172 return ((DataFrame)frame).remaining();
1173 }
1174
1175 public Throwable generate(ByteBufferPool.Lease lease)
1176 {
1177 try
1178 {
1179 int flowControlLength = dataRemaining();
1180
1181 int sessionSendWindow = getSendWindow();
1182 if (sessionSendWindow < 0)
1183 throw new IllegalStateException();
1184
1185 int streamSendWindow = stream.updateSendWindow(0);
1186 if (streamSendWindow < 0)
1187 throw new IllegalStateException();
1188
1189 int window = Math.min(streamSendWindow, sessionSendWindow);
1190
1191 int length = this.length = Math.min(flowControlLength, window);
1192 if (LOG.isDebugEnabled())
1193 LOG.debug("Generated {}, length/window={}/{}", frame, length, window);
1194
1195 generator.data(lease, (DataFrame)frame, length);
1196 flowControl.onDataSending(stream, length);
1197 return null;
1198 }
1199 catch (Throwable x)
1200 {
1201 if (LOG.isDebugEnabled())
1202 LOG.debug("Failure generating frame " + frame, x);
1203 return x;
1204 }
1205 }
1206
1207 @Override
1208 public void succeeded()
1209 {
1210 flowControl.onDataSent(stream, length);
1211
1212 DataFrame dataFrame = (DataFrame)frame;
1213 if (dataFrame.remaining() > 0)
1214 {
1215
1216
1217
1218 flusher.prepend(this);
1219 }
1220 else
1221 {
1222
1223
1224 if (stream.updateClose(dataFrame.isEndStream(), true))
1225 removeStream(stream);
1226 callback.succeeded();
1227 }
1228 }
1229 }
1230
1231 private static class PromiseCallback<C> implements Callback
1232 {
1233 private final Promise<C> promise;
1234 private final C value;
1235
1236 private PromiseCallback(Promise<C> promise, C value)
1237 {
1238 this.promise = promise;
1239 this.value = value;
1240 }
1241
1242 @Override
1243 public void succeeded()
1244 {
1245 promise.succeeded(value);
1246 }
1247
1248 @Override
1249 public void failed(Throwable x)
1250 {
1251 promise.failed(x);
1252 }
1253 }
1254 }