View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.http2;
20  
21  import java.io.IOException;
22  import java.nio.channels.ClosedChannelException;
23  import java.nio.charset.StandardCharsets;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.concurrent.ConcurrentHashMap;
30  import java.util.concurrent.ConcurrentMap;
31  import java.util.concurrent.TimeoutException;
32  import java.util.concurrent.atomic.AtomicInteger;
33  import java.util.concurrent.atomic.AtomicReference;
34  
35  import org.eclipse.jetty.http2.api.Session;
36  import org.eclipse.jetty.http2.api.Stream;
37  import org.eclipse.jetty.http2.frames.DataFrame;
38  import org.eclipse.jetty.http2.frames.DisconnectFrame;
39  import org.eclipse.jetty.http2.frames.Frame;
40  import org.eclipse.jetty.http2.frames.FrameType;
41  import org.eclipse.jetty.http2.frames.GoAwayFrame;
42  import org.eclipse.jetty.http2.frames.HeadersFrame;
43  import org.eclipse.jetty.http2.frames.PingFrame;
44  import org.eclipse.jetty.http2.frames.PriorityFrame;
45  import org.eclipse.jetty.http2.frames.PushPromiseFrame;
46  import org.eclipse.jetty.http2.frames.ResetFrame;
47  import org.eclipse.jetty.http2.frames.SettingsFrame;
48  import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
49  import org.eclipse.jetty.http2.generator.Generator;
50  import org.eclipse.jetty.http2.parser.Parser;
51  import org.eclipse.jetty.io.ByteBufferPool;
52  import org.eclipse.jetty.io.EndPoint;
53  import org.eclipse.jetty.util.Atomics;
54  import org.eclipse.jetty.util.Callback;
55  import org.eclipse.jetty.util.CountingCallback;
56  import org.eclipse.jetty.util.Promise;
57  import org.eclipse.jetty.util.annotation.ManagedAttribute;
58  import org.eclipse.jetty.util.annotation.ManagedObject;
59  import org.eclipse.jetty.util.component.ContainerLifeCycle;
60  import org.eclipse.jetty.util.log.Log;
61  import org.eclipse.jetty.util.log.Logger;
62  import org.eclipse.jetty.util.thread.Scheduler;
63  
64  @ManagedObject
65  public abstract class HTTP2Session extends ContainerLifeCycle implements ISession, Parser.Listener
66  {
67      private static final Logger LOG = Log.getLogger(HTTP2Session.class);
68  
69      private final ConcurrentMap<Integer, IStream> streams = new ConcurrentHashMap<>();
70      private final AtomicInteger streamIds = new AtomicInteger();
71      private final AtomicInteger lastStreamId = new AtomicInteger();
72      private final AtomicInteger localStreamCount = new AtomicInteger();
73      private final AtomicInteger remoteStreamCount = new AtomicInteger();
74      private final AtomicInteger sendWindow = new AtomicInteger();
75      private final AtomicInteger recvWindow = new AtomicInteger();
76      private final AtomicReference<CloseState> closed = new AtomicReference<>(CloseState.NOT_CLOSED);
77      private final Scheduler scheduler;
78      private final EndPoint endPoint;
79      private final Generator generator;
80      private final Session.Listener listener;
81      private final FlowControlStrategy flowControl;
82      private final HTTP2Flusher flusher;
83      private int maxLocalStreams;
84      private int maxRemoteStreams;
85      private long streamIdleTimeout;
86      private boolean pushEnabled;
87  
88      public HTTP2Session(Scheduler scheduler, EndPoint endPoint, Generator generator, Session.Listener listener, FlowControlStrategy flowControl, int initialStreamId)
89      {
90          this.scheduler = scheduler;
91          this.endPoint = endPoint;
92          this.generator = generator;
93          this.listener = listener;
94          this.flowControl = flowControl;
95          this.flusher = new HTTP2Flusher(this);
96          this.maxLocalStreams = -1;
97          this.maxRemoteStreams = -1;
98          this.streamIds.set(initialStreamId);
99          this.streamIdleTimeout = endPoint.getIdleTimeout();
100         this.sendWindow.set(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
101         this.recvWindow.set(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
102         this.pushEnabled = true; // SPEC: by default, push is enabled.
103     }
104 
105     @Override
106     protected void doStart() throws Exception
107     {
108         addBean(flowControl);
109         super.doStart();
110     }
111 
112     @ManagedAttribute(value = "The flow control strategy", readonly = true)
113     public FlowControlStrategy getFlowControlStrategy()
114     {
115         return flowControl;
116     }
117 
118     public int getMaxLocalStreams()
119     {
120         return maxLocalStreams;
121     }
122 
123     public void setMaxLocalStreams(int maxLocalStreams)
124     {
125         this.maxLocalStreams = maxLocalStreams;
126     }
127 
128     public int getMaxRemoteStreams()
129     {
130         return maxRemoteStreams;
131     }
132 
133     public void setMaxRemoteStreams(int maxRemoteStreams)
134     {
135         this.maxRemoteStreams = maxRemoteStreams;
136     }
137 
138     @ManagedAttribute("The stream's idle timeout")
139     public long getStreamIdleTimeout()
140     {
141         return streamIdleTimeout;
142     }
143 
144     public void setStreamIdleTimeout(long streamIdleTimeout)
145     {
146         this.streamIdleTimeout = streamIdleTimeout;
147     }
148 
149     public EndPoint getEndPoint()
150     {
151         return endPoint;
152     }
153 
154     public Generator getGenerator()
155     {
156         return generator;
157     }
158 
159     @Override
160     public void onData(final DataFrame frame)
161     {
162         if (LOG.isDebugEnabled())
163             LOG.debug("Received {}", frame);
164 
165         int streamId = frame.getStreamId();
166         final IStream stream = getStream(streamId);
167 
168         // SPEC: the session window must be updated even if the stream is null.
169         // The flow control length includes the padding bytes.
170         final int flowControlLength = frame.remaining() + frame.padding();
171         flowControl.onDataReceived(this, stream, flowControlLength);
172 
173         if (stream != null)
174         {
175             if (getRecvWindow() < 0)
176             {
177                 close(ErrorCode.FLOW_CONTROL_ERROR.code, "session_window_exceeded", Callback.NOOP);
178             }
179             else
180             {
181                 stream.process(frame, new Callback()
182                 {
183                     @Override
184                     public void succeeded()
185                     {
186                         flowControl.onDataConsumed(HTTP2Session.this, stream, flowControlLength);
187                     }
188 
189                     @Override
190                     public void failed(Throwable x)
191                     {
192                         // Consume also in case of failures, to free the
193                         // session flow control window for other streams.
194                         flowControl.onDataConsumed(HTTP2Session.this, stream, flowControlLength);
195                     }
196                 });
197             }
198         }
199         else
200         {
201             if (LOG.isDebugEnabled())
202                 LOG.debug("Ignoring {}, stream #{} not found", frame, streamId);
203             // We must enlarge the session flow control window,
204             // otherwise other requests will be stalled.
205             flowControl.onDataConsumed(this, null, flowControlLength);
206         }
207     }
208 
209     @Override
210     public abstract void onHeaders(HeadersFrame frame);
211 
212     @Override
213     public void onPriority(PriorityFrame frame)
214     {
215         if (LOG.isDebugEnabled())
216             LOG.debug("Received {}", frame);
217     }
218 
219     @Override
220     public void onReset(ResetFrame frame)
221     {
222         if (LOG.isDebugEnabled())
223             LOG.debug("Received {}", frame);
224 
225         IStream stream = getStream(frame.getStreamId());
226         if (stream != null)
227             stream.process(frame, Callback.NOOP);
228         else
229             notifyReset(this, frame);
230     }
231 
232     @Override
233     public void onSettings(SettingsFrame frame)
234     {
235         // SPEC: SETTINGS frame MUST be replied.
236         onSettings(frame, true);
237     }
238 
239     public void onSettings(SettingsFrame frame, boolean reply)
240     {
241         if (LOG.isDebugEnabled())
242             LOG.debug("Received {}", frame);
243 
244         if (frame.isReply())
245             return;
246 
247         // Iterate over all settings
248         for (Map.Entry<Integer, Integer> entry : frame.getSettings().entrySet())
249         {
250             int key = entry.getKey();
251             int value = entry.getValue();
252             switch (key)
253             {
254                 case SettingsFrame.HEADER_TABLE_SIZE:
255                 {
256                     if (LOG.isDebugEnabled())
257                         LOG.debug("Update HPACK header table size to {}", value);
258                     generator.setHeaderTableSize(value);
259                     break;
260                 }
261                 case SettingsFrame.ENABLE_PUSH:
262                 {
263                     // SPEC: check the value is sane.
264                     if (value != 0 && value != 1)
265                     {
266                         onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_settings_enable_push");
267                         return;
268                     }
269                     pushEnabled = value == 1;
270                     break;
271                 }
272                 case SettingsFrame.MAX_CONCURRENT_STREAMS:
273                 {
274                     maxLocalStreams = value;
275                     if (LOG.isDebugEnabled())
276                         LOG.debug("Update max local concurrent streams to {}", maxLocalStreams);
277                     break;
278                 }
279                 case SettingsFrame.INITIAL_WINDOW_SIZE:
280                 {
281                     if (LOG.isDebugEnabled())
282                         LOG.debug("Update initial window size to {}", value);
283                     flowControl.updateInitialStreamWindow(this, value, false);
284                     break;
285                 }
286                 case SettingsFrame.MAX_FRAME_SIZE:
287                 {
288                     if (LOG.isDebugEnabled())
289                         LOG.debug("Update max frame size to {}", value);
290                     // SPEC: check the max frame size is sane.
291                     if (value < Frame.DEFAULT_MAX_LENGTH || value > Frame.MAX_MAX_LENGTH)
292                     {
293                         onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_settings_max_frame_size");
294                         return;
295                     }
296                     generator.setMaxFrameSize(value);
297                     break;
298                 }
299                 case SettingsFrame.MAX_HEADER_LIST_SIZE:
300                 {
301                     // TODO implement
302                     LOG.warn("NOT IMPLEMENTED max header list size to {}", value);
303                     break;
304                 }
305                 default:
306                 {
307                     LOG.debug("Unknown setting {}:{}", key, value);
308                     break;
309                 }
310             }
311         }
312         notifySettings(this, frame);
313 
314         if (reply)
315         {
316             SettingsFrame replyFrame = new SettingsFrame(Collections.<Integer, Integer>emptyMap(), true);
317             settings(replyFrame, Callback.NOOP);
318         }
319     }
320 
321     @Override
322     public void onPing(PingFrame frame)
323     {
324         if (LOG.isDebugEnabled())
325             LOG.debug("Received {}", frame);
326 
327         if (frame.isReply())
328         {
329             notifyPing(this, frame);
330         }
331         else
332         {
333             PingFrame reply = new PingFrame(frame.getPayload(), true);
334             control(null, Callback.NOOP, reply);
335         }
336     }
337 
338     /**
339      * This method is called when receiving a GO_AWAY from the other peer.
340      * We check the close state to act appropriately:
341      *
342      * * NOT_CLOSED: we move to REMOTELY_CLOSED and queue a disconnect, so
343      *   that the content of the queue is written, and then the connection
344      *   closed. We notify the application after being terminated.
345      *   See <code>HTTP2Session.ControlEntry#succeeded()</code>
346      *
347      * * In all other cases, we do nothing since other methods are already
348      *   performing their actions.
349      *
350      * @param frame the GO_AWAY frame that has been received.
351      * @see #close(int, String, Callback)
352      * @see #onShutdown()
353      * @see #onIdleTimeout()
354      */
355     @Override
356     public void onGoAway(final GoAwayFrame frame)
357     {
358         if (LOG.isDebugEnabled())
359             LOG.debug("Received {}", frame);
360 
361         while (true)
362         {
363             CloseState current = closed.get();
364             switch (current)
365             {
366                 case NOT_CLOSED:
367                 {
368                     if (closed.compareAndSet(current, CloseState.REMOTELY_CLOSED))
369                     {
370                         // We received a GO_AWAY, so try to write
371                         // what's in the queue and then disconnect.
372                         control(null, new Callback()
373                         {
374                             @Override
375                             public void succeeded()
376                             {
377                                 notifyClose(HTTP2Session.this, frame);
378                             }
379 
380                             @Override
381                             public void failed(Throwable x)
382                             {
383                                 notifyClose(HTTP2Session.this, frame);
384                             }
385                         }, new DisconnectFrame());
386                         return;
387                     }
388                     break;
389                 }
390                 default:
391                 {
392                     if (LOG.isDebugEnabled())
393                         LOG.debug("Ignored {}, already closed", frame);
394                     return;
395                 }
396             }
397         }
398     }
399 
400     @Override
401     public void onWindowUpdate(WindowUpdateFrame frame)
402     {
403         if (LOG.isDebugEnabled())
404             LOG.debug("Received {}", frame);
405 
406         int streamId = frame.getStreamId();
407         if (streamId > 0)
408         {
409             IStream stream = getStream(streamId);
410             if (stream != null)
411                 onWindowUpdate(stream, frame);
412         }
413         else
414         {
415             onWindowUpdate(null, frame);
416         }
417     }
418 
419     @Override
420     public void onConnectionFailure(int error, String reason)
421     {
422         close(error, reason, Callback.NOOP);
423         notifyFailure(this, new IOException(String.format("%d/%s", error, reason)));
424     }
425 
426     @Override
427     public void newStream(HeadersFrame frame, Promise<Stream> promise, Stream.Listener listener)
428     {
429         // Synchronization is necessary to atomically create
430         // the stream id and enqueue the frame to be sent.
431         boolean queued;
432         synchronized (this)
433         {
434             int streamId = frame.getStreamId();
435             if (streamId <= 0)
436             {
437                 streamId = streamIds.getAndAdd(2);
438                 PriorityFrame priority = frame.getPriority();
439                 priority = priority == null ? null : new PriorityFrame(streamId, priority.getParentStreamId(),
440                         priority.getWeight(), priority.isExclusive());
441                 frame = new HeadersFrame(streamId, frame.getMetaData(), priority, frame.isEndStream());
442             }
443             final IStream stream = createLocalStream(streamId, promise);
444             if (stream == null)
445                 return;
446             stream.setListener(listener);
447 
448             ControlEntry entry = new ControlEntry(frame, stream, new PromiseCallback<>(promise, stream));
449             queued = flusher.append(entry);
450         }
451         // Iterate outside the synchronized block.
452         if (queued)
453             flusher.iterate();
454     }
455 
456     @Override
457     public int priority(PriorityFrame frame, Callback callback)
458     {
459         int streamId = frame.getStreamId();
460         IStream stream = streams.get(streamId);
461         if (stream == null)
462         {
463             streamId = streamIds.getAndAdd(2);
464             frame = new PriorityFrame(streamId, frame.getParentStreamId(),
465                     frame.getWeight(), frame.isExclusive());
466         }
467         control(stream, callback, frame);
468         return streamId;
469     }
470 
471     @Override
472     public void push(IStream stream, Promise<Stream> promise, PushPromiseFrame frame, Stream.Listener listener)
473     {
474         // Synchronization is necessary to atomically create
475         // the stream id and enqueue the frame to be sent.
476         boolean queued;
477         synchronized (this)
478         {
479             int streamId = streamIds.getAndAdd(2);
480             frame = new PushPromiseFrame(frame.getStreamId(), streamId, frame.getMetaData());
481 
482             final IStream pushStream = createLocalStream(streamId, promise);
483             if (pushStream == null)
484                 return;
485             pushStream.setListener(listener);
486 
487             ControlEntry entry = new ControlEntry(frame, pushStream, new PromiseCallback<>(promise, pushStream));
488             queued = flusher.append(entry);
489         }
490         // Iterate outside the synchronized block.
491         if (queued)
492             flusher.iterate();
493     }
494 
495 
496     @Override
497     public void settings(SettingsFrame frame, Callback callback)
498     {
499         control(null, callback, frame);
500     }
501 
502     @Override
503     public void ping(PingFrame frame, Callback callback)
504     {
505         if (frame.isReply())
506             callback.failed(new IllegalArgumentException());
507         else
508             control(null, callback, frame);
509     }
510 
511     protected void reset(ResetFrame frame, Callback callback)
512     {
513         control(getStream(frame.getStreamId()), callback, frame);
514     }
515 
516     /**
517      * Invoked internally and by applications to send a GO_AWAY frame to the
518      * other peer. We check the close state to act appropriately:
519      *
520      * * NOT_CLOSED: we move to LOCALLY_CLOSED and queue a GO_AWAY. When the
521      *   GO_AWAY has been written, it will only cause the output to be shut
522      *   down (not the connection closed), so that the application can still
523      *   read frames arriving from the other peer.
524      *   Ideally the other peer will notice the GO_AWAY and close the connection.
525      *   When that happen, we close the connection from {@link #onShutdown()}.
526      *   Otherwise, the idle timeout mechanism will close the connection, see
527      *   {@link #onIdleTimeout()}.
528      *
529      * * In all other cases, we do nothing since other methods are already
530      *   performing their actions.
531      *
532      * @param error the error code
533      * @param reason the reason
534      * @param callback the callback to invoke when the operation is complete
535      * @see #onGoAway(GoAwayFrame)
536      * @see #onShutdown()
537      * @see #onIdleTimeout()
538      */
539     @Override
540     public boolean close(int error, String reason, Callback callback)
541     {
542         while (true)
543         {
544             CloseState current = closed.get();
545             switch (current)
546             {
547                 case NOT_CLOSED:
548                 {
549                     if (closed.compareAndSet(current, CloseState.LOCALLY_CLOSED))
550                     {
551                         byte[] payload = null;
552                         if (reason != null)
553                         {
554                             // Trim the reason to avoid attack vectors.
555                             reason = reason.substring(0, Math.min(reason.length(), 32));
556                             payload = reason.getBytes(StandardCharsets.UTF_8);
557                         }
558                         GoAwayFrame frame = new GoAwayFrame(lastStreamId.get(), error, payload);
559                         control(null, callback, frame);
560                         return true;
561                     }
562                     break;
563                 }
564                 default:
565                 {
566                     if (LOG.isDebugEnabled())
567                         LOG.debug("Ignoring close {}/{}, already closed", error, reason);
568                     callback.succeeded();
569                     return false;
570                 }
571             }
572         }
573     }
574 
575     @Override
576     public boolean isClosed()
577     {
578         return closed.get() != CloseState.NOT_CLOSED;
579     }
580 
581     private void control(IStream stream, Callback callback, Frame frame)
582     {
583         frames(stream, callback, frame, Frame.EMPTY_ARRAY);
584     }
585 
586     @Override
587     public void frames(IStream stream, Callback callback, Frame frame, Frame... frames)
588     {
589         // We want to generate as late as possible to allow re-prioritization;
590         // generation will happen while processing the entries.
591 
592         // The callback needs to be notified only when the last frame completes.
593 
594         int length = frames.length;
595         if (length == 0)
596         {
597             frame(new ControlEntry(frame, stream, callback), true);
598         }
599         else
600         {
601             callback = new CountingCallback(callback, 1 + length);
602             frame(new ControlEntry(frame, stream, callback), false);
603             for (int i = 1; i <= length; ++i)
604                 frame(new ControlEntry(frames[i - 1], stream, callback), i == length);
605         }
606     }
607 
608     @Override
609     public void data(IStream stream, Callback callback, DataFrame frame)
610     {
611         // We want to generate as late as possible to allow re-prioritization.
612         frame(new DataEntry(frame, stream, callback), true);
613     }
614 
615     private void frame(HTTP2Flusher.Entry entry, boolean flush)
616     {
617         if (LOG.isDebugEnabled())
618             LOG.debug("Sending {}", entry.frame);
619         // Ping frames are prepended to process them as soon as possible.
620         boolean queued = entry.frame.getType() == FrameType.PING ? flusher.prepend(entry) : flusher.append(entry);
621         if (queued && flush)
622             flusher.iterate();
623     }
624 
625     protected IStream createLocalStream(int streamId, Promise<Stream> promise)
626     {
627         while (true)
628         {
629             int localCount = localStreamCount.get();
630             int maxCount = maxLocalStreams;
631             if (maxCount >= 0 && localCount >= maxCount)
632             {
633                 promise.failed(new IllegalStateException("Max local stream count " + maxCount + " exceeded"));
634                 return null;
635             }
636             if (localStreamCount.compareAndSet(localCount, localCount + 1))
637                 break;
638         }
639 
640         IStream stream = newStream(streamId, true);
641         if (streams.putIfAbsent(streamId, stream) == null)
642         {
643             stream.setIdleTimeout(getStreamIdleTimeout());
644             flowControl.onStreamCreated(stream);
645             if (LOG.isDebugEnabled())
646                 LOG.debug("Created local {}", stream);
647             return stream;
648         }
649         else
650         {
651             promise.failed(new IllegalStateException("Duplicate stream " + streamId));
652             return null;
653         }
654     }
655 
656     protected IStream createRemoteStream(int streamId)
657     {
658         // SPEC: exceeding max concurrent streams is treated as stream error.
659         while (true)
660         {
661             int remoteCount = remoteStreamCount.get();
662             int maxCount = getMaxRemoteStreams();
663             if (maxCount >= 0 && remoteCount >= maxCount)
664             {
665                 reset(new ResetFrame(streamId, ErrorCode.REFUSED_STREAM_ERROR.code), Callback.NOOP);
666                 return null;
667             }
668             if (remoteStreamCount.compareAndSet(remoteCount, remoteCount + 1))
669                 break;
670         }
671 
672         IStream stream = newStream(streamId, false);
673 
674         // SPEC: duplicate stream is treated as connection error.
675         if (streams.putIfAbsent(streamId, stream) == null)
676         {
677             updateLastStreamId(streamId);
678             stream.setIdleTimeout(getStreamIdleTimeout());
679             flowControl.onStreamCreated(stream);
680             if (LOG.isDebugEnabled())
681                 LOG.debug("Created remote {}", stream);
682             return stream;
683         }
684         else
685         {
686             close(ErrorCode.PROTOCOL_ERROR.code, "duplicate_stream", Callback.NOOP);
687             return null;
688         }
689     }
690 
691     protected IStream newStream(int streamId, boolean local)
692     {
693         return new HTTP2Stream(scheduler, this, streamId, local);
694     }
695 
696     @Override
697     public void removeStream(IStream stream)
698     {
699         IStream removed = streams.remove(stream.getId());
700         if (removed != null)
701         {
702             assert removed == stream;
703 
704             boolean local = stream.isLocal();
705             if (local)
706                 localStreamCount.decrementAndGet();
707             else
708                 remoteStreamCount.decrementAndGet();
709 
710             flowControl.onStreamDestroyed(stream);
711 
712             if (LOG.isDebugEnabled())
713                 LOG.debug("Removed {} {}", local ? "local" : "remote", stream);
714         }
715     }
716 
717     @Override
718     public Collection<Stream> getStreams()
719     {
720         List<Stream> result = new ArrayList<>();
721         result.addAll(streams.values());
722         return result;
723     }
724 
725     @ManagedAttribute("The number of active streams")
726     public int getStreamCount()
727     {
728         return streams.size();
729     }
730 
731     @Override
732     public IStream getStream(int streamId)
733     {
734         return streams.get(streamId);
735     }
736 
737     @ManagedAttribute(value = "The flow control send window", readonly = true)
738     public int getSendWindow()
739     {
740         return sendWindow.get();
741     }
742 
743     @ManagedAttribute(value = "The flow control receive window", readonly = true)
744     public int getRecvWindow()
745     {
746         return recvWindow.get();
747     }
748 
749     @Override
750     public int updateSendWindow(int delta)
751     {
752         return sendWindow.getAndAdd(delta);
753     }
754 
755     @Override
756     public int updateRecvWindow(int delta)
757     {
758         return recvWindow.getAndAdd(delta);
759     }
760 
761     @Override
762     public void onWindowUpdate(IStream stream, WindowUpdateFrame frame)
763     {
764         // WindowUpdateFrames arrive concurrently with writes.
765         // Increasing (or reducing) the window size concurrently
766         // with writes requires coordination with the flusher, that
767         // decides how many frames to write depending on the available
768         // window sizes. If the window sizes vary concurrently, the
769         // flusher may take non-optimal or wrong decisions.
770         // Here, we "queue" window updates to the flusher, so it will
771         // be the only component responsible for window updates, for
772         // both increments and reductions.
773         flusher.window(stream, frame);
774     }
775 
776     @Override
777     @ManagedAttribute(value = "Whether HTTP/2 push is enabled", readonly = true)
778     public boolean isPushEnabled()
779     {
780         return pushEnabled;
781     }
782 
783     /**
784      * A typical close by a remote peer involves a GO_AWAY frame followed by TCP FIN.
785      * This method is invoked when the TCP FIN is received, or when an exception is
786      * thrown while reading, and we check the close state to act appropriately:
787      *
788      * * NOT_CLOSED: means that the remote peer did not send a GO_AWAY (abrupt close)
789      *   or there was an exception while reading, and therefore we terminate.
790      *
791      * * LOCALLY_CLOSED: we have sent the GO_AWAY to the remote peer, which received
792      *   it and closed the connection; we queue a disconnect to close the connection
793      *   on the local side.
794      *   The GO_AWAY just shutdown the output, so we need this step to make sure the
795      *   connection is closed. See {@link #close(int, String, Callback)}.
796      *
797      * * REMOTELY_CLOSED: we received the GO_AWAY, and the TCP FIN afterwards, so we
798      *   do nothing since the handling of the GO_AWAY will take care of closing the
799      *   connection. See {@link #onGoAway(GoAwayFrame)}.
800      *
801      * @see #onGoAway(GoAwayFrame)
802      * @see #close(int, String, Callback)
803      * @see #onIdleTimeout()
804      */
805     @Override
806     public void onShutdown()
807     {
808         if (LOG.isDebugEnabled())
809             LOG.debug("Shutting down {}", this);
810 
811         switch (closed.get())
812         {
813             case NOT_CLOSED:
814             {
815                 // The other peer did not send a GO_AWAY, no need to be gentle.
816                 if (LOG.isDebugEnabled())
817                     LOG.debug("Abrupt close for {}", this);
818                 abort(new ClosedChannelException());
819                 break;
820             }
821             case LOCALLY_CLOSED:
822             {
823                 // We have closed locally, and only shutdown
824                 // the output; now queue a disconnect.
825                 control(null, Callback.NOOP, new DisconnectFrame());
826                 break;
827             }
828             case REMOTELY_CLOSED:
829             {
830                 // Nothing to do, the GO_AWAY frame we
831                 // received will close the connection.
832                 break;
833             }
834             default:
835             {
836                 break;
837             }
838         }
839     }
840 
841     /**
842      * This method is invoked when the idle timeout triggers. We check the close state
843      * to act appropriately:
844      *
845      * * NOT_CLOSED: it's a real idle timeout, we just initiate a close, see
846      *   {@link #close(int, String, Callback)}.
847      *
848      * * LOCALLY_CLOSED: we have sent a GO_AWAY and only shutdown the output, but the
849      *   other peer did not close the connection so we never received the TCP FIN, and
850      *   therefore we terminate.
851      *
852      * * REMOTELY_CLOSED: the other peer sent us a GO_AWAY, we should have queued a
853      *   disconnect, but for some reason it was not processed (for example, queue was
854      *   stuck because of TCP congestion), therefore we terminate.
855      *   See {@link #onGoAway(GoAwayFrame)}.
856      *
857      * @return true if the session should be closed, false otherwise
858      * @see #onGoAway(GoAwayFrame)
859      * @see #close(int, String, Callback)
860      * @see #onShutdown()
861      */
862     @Override
863     public boolean onIdleTimeout()
864     {
865         switch (closed.get())
866         {
867             case NOT_CLOSED:
868             {
869                 return notifyIdleTimeout(this);
870             }
871             case LOCALLY_CLOSED:
872             case REMOTELY_CLOSED:
873             {
874                 abort(new TimeoutException());
875                 return false;
876             }
877             default:
878             {
879                 return false;
880             }
881         }
882     }
883 
884     @Override
885     public void onFrame(Frame frame)
886     {
887         onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "upgrade");
888     }
889 
890     public void disconnect()
891     {
892         if (LOG.isDebugEnabled())
893             LOG.debug("Disconnecting {}", this);
894         endPoint.close();
895     }
896 
897     private void terminate()
898     {
899         while (true)
900         {
901             CloseState current = closed.get();
902             switch (current)
903             {
904                 case NOT_CLOSED:
905                 case LOCALLY_CLOSED:
906                 case REMOTELY_CLOSED:
907                 {
908                     if (closed.compareAndSet(current, CloseState.CLOSED))
909                     {
910                         flusher.close();
911                         for (IStream stream : streams.values())
912                             stream.close();
913                         streams.clear();
914                         disconnect();
915                         return;
916                     }
917                     break;
918                 }
919                 default:
920                 {
921                     return;
922                 }
923             }
924         }
925     }
926 
927     protected void abort(Throwable failure)
928     {
929         terminate();
930         notifyFailure(this, failure);
931     }
932 
933     public boolean isDisconnected()
934     {
935         return !endPoint.isOpen();
936     }
937 
938     private void updateLastStreamId(int streamId)
939     {
940         Atomics.updateMax(lastStreamId, streamId);
941     }
942 
943     protected Stream.Listener notifyNewStream(Stream stream, HeadersFrame frame)
944     {
945         try
946         {
947             return listener.onNewStream(stream, frame);
948         }
949         catch (Throwable x)
950         {
951             LOG.info("Failure while notifying listener " + listener, x);
952             return null;
953         }
954     }
955 
956     protected void notifySettings(Session session, SettingsFrame frame)
957     {
958         try
959         {
960             listener.onSettings(session, frame);
961         }
962         catch (Throwable x)
963         {
964             LOG.info("Failure while notifying listener " + listener, x);
965         }
966     }
967 
968     protected void notifyPing(Session session, PingFrame frame)
969     {
970         try
971         {
972             listener.onPing(session, frame);
973         }
974         catch (Throwable x)
975         {
976             LOG.info("Failure while notifying listener " + listener, x);
977         }
978     }
979 
980     protected void notifyReset(Session session, ResetFrame frame)
981     {
982         try
983         {
984             listener.onReset(session, frame);
985         }
986         catch (Throwable x)
987         {
988             LOG.info("Failure while notifying listener " + listener, x);
989         }
990     }
991 
992     protected void notifyClose(Session session, GoAwayFrame frame)
993     {
994         try
995         {
996             listener.onClose(session, frame);
997         }
998         catch (Throwable x)
999         {
1000             LOG.info("Failure while notifying listener " + listener, x);
1001         }
1002     }
1003 
1004     protected boolean notifyIdleTimeout(Session session)
1005     {
1006         try
1007         {
1008             return listener.onIdleTimeout(session);
1009         }
1010         catch (Throwable x)
1011         {
1012             LOG.info("Failure while notifying listener " + listener, x);
1013             return true;
1014         }
1015     }
1016 
1017     protected void notifyFailure(Session session, Throwable failure)
1018     {
1019         try
1020         {
1021             listener.onFailure(session, failure);
1022         }
1023         catch (Throwable x)
1024         {
1025             LOG.info("Failure while notifying listener " + listener, x);
1026         }
1027     }
1028 
1029     @Override
1030     public String toString()
1031     {
1032         return String.format("%s@%x{l:%s <-> r:%s,queueSize=%d,sendWindow=%s,recvWindow=%s,streams=%d,%s}",
1033                 getClass().getSimpleName(),
1034                 hashCode(),
1035                 getEndPoint().getLocalAddress(),
1036                 getEndPoint().getRemoteAddress(),
1037                 flusher.getQueueSize(),
1038                 sendWindow,
1039                 recvWindow,
1040                 streams.size(),
1041                 closed);
1042     }
1043 
1044     private class ControlEntry extends HTTP2Flusher.Entry
1045     {
1046         private ControlEntry(Frame frame, IStream stream, Callback callback)
1047         {
1048             super(frame, stream, callback);
1049         }
1050 
1051         public Throwable generate(ByteBufferPool.Lease lease)
1052         {
1053             try
1054             {
1055                 generator.control(lease, frame);
1056                 if (LOG.isDebugEnabled())
1057                     LOG.debug("Generated {}", frame);
1058                 prepare();
1059                 return null;
1060             }
1061             catch (Throwable x)
1062             {
1063                 if (LOG.isDebugEnabled())
1064                     LOG.debug("Failure generating frame " + frame, x);
1065                 return x;
1066             }
1067         }
1068 
1069         /**
1070          * <p>Performs actions just before writing the frame to the network.</p>
1071          * <p>Some frame, when sent over the network, causes the receiver
1072          * to react and send back frames that may be processed by the original
1073          * sender *before* {@link #succeeded()} is called.
1074          * <p>If the action to perform updates some state, this update may
1075          * not be seen by the received frames and cause errors.</p>
1076          * <p>For example, suppose the action updates the stream window to a
1077          * larger value; the sender sends the frame; the receiver is now entitled
1078          * to send back larger data; when the data is received by the original
1079          * sender, the action may have not been performed yet, causing the larger
1080          * data to be rejected, when it should have been accepted.</p>
1081          */
1082         private void prepare()
1083         {
1084             switch (frame.getType())
1085             {
1086                 case SETTINGS:
1087                 {
1088                     SettingsFrame settingsFrame = (SettingsFrame)frame;
1089                     Integer initialWindow = settingsFrame.getSettings().get(SettingsFrame.INITIAL_WINDOW_SIZE);
1090                     if (initialWindow != null)
1091                         flowControl.updateInitialStreamWindow(HTTP2Session.this, initialWindow, true);
1092                     break;
1093                 }
1094                 default:
1095                 {
1096                     break;
1097                 }
1098             }
1099         }
1100 
1101         @Override
1102         public void succeeded()
1103         {
1104             switch (frame.getType())
1105             {
1106                 case HEADERS:
1107                 {
1108                     HeadersFrame headersFrame = (HeadersFrame)frame;
1109                     if (stream.updateClose(headersFrame.isEndStream(), true))
1110                         removeStream(stream);
1111                     break;
1112                 }
1113                 case RST_STREAM:
1114                 {
1115                     if (stream != null)
1116                     {
1117                         stream.close();
1118                         removeStream(stream);
1119                     }
1120                     break;
1121                 }
1122                 case PUSH_PROMISE:
1123                 {
1124                     // Pushed streams are implicitly remotely closed.
1125                     // They are closed when sending an end-stream DATA frame.
1126                     stream.updateClose(true, false);
1127                     break;
1128                 }
1129                 case GO_AWAY:
1130                 {
1131                     // We just sent a GO_AWAY, only shutdown the
1132                     // output without closing yet, to allow reads.
1133                     getEndPoint().shutdownOutput();
1134                     break;
1135                 }
1136                 case WINDOW_UPDATE:
1137                 {
1138                     flowControl.windowUpdate(HTTP2Session.this, stream, (WindowUpdateFrame)frame);
1139                     break;
1140                 }
1141                 case DISCONNECT:
1142                 {
1143                     terminate();
1144                     break;
1145                 }
1146                 default:
1147                 {
1148                     break;
1149                 }
1150             }
1151             callback.succeeded();
1152         }
1153     }
1154 
1155     private class DataEntry extends HTTP2Flusher.Entry
1156     {
1157         private int length;
1158 
1159         private DataEntry(DataFrame frame, IStream stream, Callback callback)
1160         {
1161             super(frame, stream, callback);
1162         }
1163 
1164         @Override
1165         public int dataRemaining()
1166         {
1167             // We don't do any padding, so the flow control length is
1168             // always the data remaining. This simplifies the handling
1169             // of data frames that cannot be completely written due to
1170             // the flow control window exhausting, since in that case
1171             // we would have to count the padding only once.
1172             return ((DataFrame)frame).remaining();
1173         }
1174 
1175         public Throwable generate(ByteBufferPool.Lease lease)
1176         {
1177             try
1178             {
1179                 int flowControlLength = dataRemaining();
1180 
1181                 int sessionSendWindow = getSendWindow();
1182                 if (sessionSendWindow < 0)
1183                     throw new IllegalStateException();
1184 
1185                 int streamSendWindow = stream.updateSendWindow(0);
1186                 if (streamSendWindow < 0)
1187                     throw new IllegalStateException();
1188 
1189                 int window = Math.min(streamSendWindow, sessionSendWindow);
1190 
1191                 int length = this.length = Math.min(flowControlLength, window);
1192                 if (LOG.isDebugEnabled())
1193                     LOG.debug("Generated {}, length/window={}/{}", frame, length, window);
1194 
1195                 generator.data(lease, (DataFrame)frame, length);
1196                 flowControl.onDataSending(stream, length);
1197                 return null;
1198             }
1199             catch (Throwable x)
1200             {
1201                 if (LOG.isDebugEnabled())
1202                     LOG.debug("Failure generating frame " + frame, x);
1203                 return x;
1204             }
1205         }
1206 
1207         @Override
1208         public void succeeded()
1209         {
1210             flowControl.onDataSent(stream, length);
1211             // Do we have more to send ?
1212             DataFrame dataFrame = (DataFrame)frame;
1213             if (dataFrame.remaining() > 0)
1214             {
1215                 // We have written part of the frame, but there is more to write.
1216                 // We need to keep the correct ordering of frames, to avoid that other
1217                 // frames for the same stream are written before this one is finished.
1218                 flusher.prepend(this);
1219             }
1220             else
1221             {
1222                 // Only now we can update the close state
1223                 // and eventually remove the stream.
1224                 if (stream.updateClose(dataFrame.isEndStream(), true))
1225                     removeStream(stream);
1226                 callback.succeeded();
1227             }
1228         }
1229     }
1230 
1231     private static class PromiseCallback<C> implements Callback
1232     {
1233         private final Promise<C> promise;
1234         private final C value;
1235 
1236         private PromiseCallback(Promise<C> promise, C value)
1237         {
1238             this.promise = promise;
1239             this.value = value;
1240         }
1241 
1242         @Override
1243         public void succeeded()
1244         {
1245             promise.succeeded(value);
1246         }
1247 
1248         @Override
1249         public void failed(Throwable x)
1250         {
1251             promise.failed(x);
1252         }
1253     }
1254 }