View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.http2;
20  
21  import java.io.IOException;
22  import java.nio.channels.ClosedChannelException;
23  import java.nio.charset.StandardCharsets;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.concurrent.ConcurrentHashMap;
30  import java.util.concurrent.ConcurrentMap;
31  import java.util.concurrent.TimeoutException;
32  import java.util.concurrent.atomic.AtomicInteger;
33  import java.util.concurrent.atomic.AtomicReference;
34  
35  import org.eclipse.jetty.http2.api.Session;
36  import org.eclipse.jetty.http2.api.Stream;
37  import org.eclipse.jetty.http2.frames.DataFrame;
38  import org.eclipse.jetty.http2.frames.DisconnectFrame;
39  import org.eclipse.jetty.http2.frames.Frame;
40  import org.eclipse.jetty.http2.frames.FrameType;
41  import org.eclipse.jetty.http2.frames.GoAwayFrame;
42  import org.eclipse.jetty.http2.frames.HeadersFrame;
43  import org.eclipse.jetty.http2.frames.PingFrame;
44  import org.eclipse.jetty.http2.frames.PriorityFrame;
45  import org.eclipse.jetty.http2.frames.PushPromiseFrame;
46  import org.eclipse.jetty.http2.frames.ResetFrame;
47  import org.eclipse.jetty.http2.frames.SettingsFrame;
48  import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
49  import org.eclipse.jetty.http2.generator.Generator;
50  import org.eclipse.jetty.http2.parser.Parser;
51  import org.eclipse.jetty.io.ByteBufferPool;
52  import org.eclipse.jetty.io.EndPoint;
53  import org.eclipse.jetty.util.Atomics;
54  import org.eclipse.jetty.util.Callback;
55  import org.eclipse.jetty.util.CountingCallback;
56  import org.eclipse.jetty.util.Promise;
57  import org.eclipse.jetty.util.log.Log;
58  import org.eclipse.jetty.util.log.Logger;
59  import org.eclipse.jetty.util.thread.Scheduler;
60  
61  public abstract class HTTP2Session implements ISession, Parser.Listener
62  {
63      private static final Logger LOG = Log.getLogger(HTTP2Session.class);
64  
65      private final ConcurrentMap<Integer, IStream> streams = new ConcurrentHashMap<>();
66      private final AtomicInteger streamIds = new AtomicInteger();
67      private final AtomicInteger lastStreamId = new AtomicInteger();
68      private final AtomicInteger localStreamCount = new AtomicInteger();
69      private final AtomicInteger remoteStreamCount = new AtomicInteger();
70      private final AtomicInteger sendWindow = new AtomicInteger();
71      private final AtomicInteger recvWindow = new AtomicInteger();
72      private final AtomicReference<CloseState> closed = new AtomicReference<>(CloseState.NOT_CLOSED);
73      private final Scheduler scheduler;
74      private final EndPoint endPoint;
75      private final Generator generator;
76      private final Listener listener;
77      private final FlowControlStrategy flowControl;
78      private final HTTP2Flusher flusher;
79      private int maxLocalStreams;
80      private int maxRemoteStreams;
81      private long streamIdleTimeout;
82      private boolean pushEnabled;
83  
84      public HTTP2Session(Scheduler scheduler, EndPoint endPoint, Generator generator, Listener listener, FlowControlStrategy flowControl, int initialStreamId)
85      {
86          this.scheduler = scheduler;
87          this.endPoint = endPoint;
88          this.generator = generator;
89          this.listener = listener;
90          this.flowControl = flowControl;
91          this.flusher = new HTTP2Flusher(this);
92          this.maxLocalStreams = -1;
93          this.maxRemoteStreams = -1;
94          this.streamIds.set(initialStreamId);
95          this.streamIdleTimeout = endPoint.getIdleTimeout();
96          this.sendWindow.set(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
97          this.recvWindow.set(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
98          this.pushEnabled = true; // SPEC: by default, push is enabled.
99      }
100 
101     public FlowControlStrategy getFlowControlStrategy()
102     {
103         return flowControl;
104     }
105 
106     public int getMaxLocalStreams()
107     {
108         return maxLocalStreams;
109     }
110 
111     public void setMaxLocalStreams(int maxLocalStreams)
112     {
113         this.maxLocalStreams = maxLocalStreams;
114     }
115 
116     public int getMaxRemoteStreams()
117     {
118         return maxRemoteStreams;
119     }
120 
121     public void setMaxRemoteStreams(int maxRemoteStreams)
122     {
123         this.maxRemoteStreams = maxRemoteStreams;
124     }
125 
126     public long getStreamIdleTimeout()
127     {
128         return streamIdleTimeout;
129     }
130 
131     public void setStreamIdleTimeout(long streamIdleTimeout)
132     {
133         this.streamIdleTimeout = streamIdleTimeout;
134     }
135 
136     public EndPoint getEndPoint()
137     {
138         return endPoint;
139     }
140 
141     public Generator getGenerator()
142     {
143         return generator;
144     }
145 
146     @Override
147     public void onData(final DataFrame frame)
148     {
149         if (LOG.isDebugEnabled())
150             LOG.debug("Received {}", frame);
151 
152         int streamId = frame.getStreamId();
153         final IStream stream = getStream(streamId);
154 
155         // SPEC: the session window must be updated even if the stream is null.
156         // The flow control length includes the padding bytes.
157         final int flowControlLength = frame.remaining() + frame.padding();
158         flowControl.onDataReceived(this, stream, flowControlLength);
159 
160         if (stream != null)
161         {
162             if (getRecvWindow() < 0)
163             {
164                 close(ErrorCode.FLOW_CONTROL_ERROR.code, "session_window_exceeded", Callback.NOOP);
165             }
166             else
167             {
168                 stream.process(frame, new Callback()
169                 {
170                     @Override
171                     public void succeeded()
172                     {
173                         flowControl.onDataConsumed(HTTP2Session.this, stream, flowControlLength);
174                     }
175 
176                     @Override
177                     public void failed(Throwable x)
178                     {
179                         // Consume also in case of failures, to free the
180                         // session flow control window for other streams.
181                         flowControl.onDataConsumed(HTTP2Session.this, stream, flowControlLength);
182                     }
183                 });
184             }
185         }
186         else
187         {
188             if (LOG.isDebugEnabled())
189                 LOG.debug("Ignoring {}, stream #{} not found", frame, streamId);
190             // We must enlarge the session flow control window,
191             // otherwise other requests will be stalled.
192             flowControl.onDataConsumed(this, null, flowControlLength);
193         }
194     }
195 
196     @Override
197     public abstract void onHeaders(HeadersFrame frame);
198 
199     @Override
200     public void onPriority(PriorityFrame frame)
201     {
202         if (LOG.isDebugEnabled())
203             LOG.debug("Received {}", frame);
204     }
205 
206     @Override
207     public void onReset(ResetFrame frame)
208     {
209         if (LOG.isDebugEnabled())
210             LOG.debug("Received {}", frame);
211 
212         IStream stream = getStream(frame.getStreamId());
213         if (stream != null)
214             stream.process(frame, Callback.NOOP);
215         else
216             notifyReset(this, frame);
217     }
218 
219     @Override
220     public void onSettings(SettingsFrame frame)
221     {
222         // SPEC: SETTINGS frame MUST be replied.
223         onSettings(frame, true);
224     }
225 
226     public void onSettings(SettingsFrame frame, boolean reply)
227     {
228         if (LOG.isDebugEnabled())
229             LOG.debug("Received {}", frame);
230 
231         if (frame.isReply())
232             return;
233 
234         // Iterate over all settings
235         for (Map.Entry<Integer, Integer> entry : frame.getSettings().entrySet())
236         {
237             int key = entry.getKey();
238             int value = entry.getValue();
239             switch (key)
240             {
241                 case SettingsFrame.HEADER_TABLE_SIZE:
242                 {
243                     if (LOG.isDebugEnabled())
244                         LOG.debug("Update HPACK header table size to {}", value);
245                     generator.setHeaderTableSize(value);
246                     break;
247                 }
248                 case SettingsFrame.ENABLE_PUSH:
249                 {
250                     // SPEC: check the value is sane.
251                     if (value != 0 && value != 1)
252                     {
253                         onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_settings_enable_push");
254                         return;
255                     }
256                     pushEnabled = value == 1;
257                     break;
258                 }
259                 case SettingsFrame.MAX_CONCURRENT_STREAMS:
260                 {
261                     maxLocalStreams = value;
262                     if (LOG.isDebugEnabled())
263                         LOG.debug("Update max local concurrent streams to {}", maxLocalStreams);
264                     break;
265                 }
266                 case SettingsFrame.INITIAL_WINDOW_SIZE:
267                 {
268                     if (LOG.isDebugEnabled())
269                         LOG.debug("Update initial window size to {}", value);
270                     flowControl.updateInitialStreamWindow(this, value, false);
271                     break;
272                 }
273                 case SettingsFrame.MAX_FRAME_SIZE:
274                 {
275                     if (LOG.isDebugEnabled())
276                         LOG.debug("Update max frame size to {}", value);
277                     // SPEC: check the max frame size is sane.
278                     if (value < Frame.DEFAULT_MAX_LENGTH || value > Frame.MAX_MAX_LENGTH)
279                     {
280                         onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "invalid_settings_max_frame_size");
281                         return;
282                     }
283                     generator.setMaxFrameSize(value);
284                     break;
285                 }
286                 case SettingsFrame.MAX_HEADER_LIST_SIZE:
287                 {
288                     // TODO implement
289                     LOG.warn("NOT IMPLEMENTED max header list size to {}", value);
290                     break;
291                 }
292                 default:
293                 {
294                     LOG.debug("Unknown setting {}:{}", key, value);
295                     break;
296                 }
297             }
298         }
299         notifySettings(this, frame);
300 
301         if (reply)
302         {
303             SettingsFrame replyFrame = new SettingsFrame(Collections.<Integer, Integer>emptyMap(), true);
304             settings(replyFrame, Callback.NOOP);
305         }
306     }
307 
308     @Override
309     public void onPing(PingFrame frame)
310     {
311         if (LOG.isDebugEnabled())
312             LOG.debug("Received {}", frame);
313 
314         if (frame.isReply())
315         {
316             notifyPing(this, frame);
317         }
318         else
319         {
320             PingFrame reply = new PingFrame(frame.getPayload(), true);
321             control(null, Callback.NOOP, reply);
322         }
323     }
324 
325     /**
326      * This method is called when receiving a GO_AWAY from the other peer.
327      * We check the close state to act appropriately:
328      *
329      * * NOT_CLOSED: we move to REMOTELY_CLOSED and queue a disconnect, so
330      *   that the content of the queue is written, and then the connection
331      *   closed. We notify the application after being terminated.
332      *   See <code>HTTP2Session.ControlEntry#succeeded()</code>
333      *
334      * * In all other cases, we do nothing since other methods are already
335      *   performing their actions.
336      *
337      * @param frame the GO_AWAY frame that has been received.
338      * @see #close(int, String, Callback)
339      * @see #onShutdown()
340      * @see #onIdleTimeout()
341      */
342     @Override
343     public void onGoAway(final GoAwayFrame frame)
344     {
345         if (LOG.isDebugEnabled())
346             LOG.debug("Received {}", frame);
347 
348         while (true)
349         {
350             CloseState current = closed.get();
351             switch (current)
352             {
353                 case NOT_CLOSED:
354                 {
355                     if (closed.compareAndSet(current, CloseState.REMOTELY_CLOSED))
356                     {
357                         // We received a GO_AWAY, so try to write
358                         // what's in the queue and then disconnect.
359                         control(null, new Callback()
360                         {
361                             @Override
362                             public void succeeded()
363                             {
364                                 notifyClose(HTTP2Session.this, frame);
365                             }
366 
367                             @Override
368                             public void failed(Throwable x)
369                             {
370                                 notifyClose(HTTP2Session.this, frame);
371                             }
372                         }, new DisconnectFrame());
373                         return;
374                     }
375                     break;
376                 }
377                 default:
378                 {
379                     if (LOG.isDebugEnabled())
380                         LOG.debug("Ignored {}, already closed", frame);
381                     return;
382                 }
383             }
384         }
385     }
386 
387     @Override
388     public void onWindowUpdate(WindowUpdateFrame frame)
389     {
390         if (LOG.isDebugEnabled())
391             LOG.debug("Received {}", frame);
392 
393         int streamId = frame.getStreamId();
394         if (streamId > 0)
395         {
396             IStream stream = getStream(streamId);
397             if (stream != null)
398                 onWindowUpdate(stream, frame);
399         }
400         else
401         {
402             onWindowUpdate(null, frame);
403         }
404     }
405 
406     @Override
407     public void onConnectionFailure(int error, String reason)
408     {
409         close(error, reason, Callback.NOOP);
410         notifyFailure(this, new IOException(String.format("%d/%s", error, reason)));
411     }
412 
413     @Override
414     public void newStream(HeadersFrame frame, Promise<Stream> promise, Stream.Listener listener)
415     {
416         // Synchronization is necessary to atomically create
417         // the stream id and enqueue the frame to be sent.
418         boolean queued;
419         synchronized (this)
420         {
421             int streamId = frame.getStreamId();
422             if (streamId <= 0)
423             {
424                 streamId = streamIds.getAndAdd(2);
425                 PriorityFrame priority = frame.getPriority();
426                 priority = priority == null ? null : new PriorityFrame(streamId, priority.getParentStreamId(),
427                         priority.getWeight(), priority.isExclusive());
428                 frame = new HeadersFrame(streamId, frame.getMetaData(), priority, frame.isEndStream());
429             }
430             final IStream stream = createLocalStream(streamId, promise);
431             if (stream == null)
432                 return;
433             stream.setListener(listener);
434 
435             ControlEntry entry = new ControlEntry(frame, stream, new PromiseCallback<>(promise, stream));
436             queued = flusher.append(entry);
437         }
438         // Iterate outside the synchronized block.
439         if (queued)
440             flusher.iterate();
441     }
442 
443     @Override
444     public int priority(PriorityFrame frame, Callback callback)
445     {
446         int streamId = frame.getStreamId();
447         IStream stream = streams.get(streamId);
448         if (stream == null)
449         {
450             streamId = streamIds.getAndAdd(2);
451             frame = new PriorityFrame(streamId, frame.getParentStreamId(),
452                     frame.getWeight(), frame.isExclusive());
453         }
454         control(stream, callback, frame);
455         return streamId;
456     }
457 
458     @Override
459     public void push(IStream stream, Promise<Stream> promise, PushPromiseFrame frame, Stream.Listener listener)
460     {
461         // Synchronization is necessary to atomically create
462         // the stream id and enqueue the frame to be sent.
463         boolean queued;
464         synchronized (this)
465         {
466             int streamId = streamIds.getAndAdd(2);
467             frame = new PushPromiseFrame(frame.getStreamId(), streamId, frame.getMetaData());
468 
469             final IStream pushStream = createLocalStream(streamId, promise);
470             if (pushStream == null)
471                 return;
472             pushStream.setListener(listener);
473 
474             ControlEntry entry = new ControlEntry(frame, pushStream, new PromiseCallback<>(promise, pushStream));
475             queued = flusher.append(entry);
476         }
477         // Iterate outside the synchronized block.
478         if (queued)
479             flusher.iterate();
480     }
481 
482 
483     @Override
484     public void settings(SettingsFrame frame, Callback callback)
485     {
486         control(null, callback, frame);
487     }
488 
489     @Override
490     public void ping(PingFrame frame, Callback callback)
491     {
492         if (frame.isReply())
493             callback.failed(new IllegalArgumentException());
494         else
495             control(null, callback, frame);
496     }
497 
498     protected void reset(ResetFrame frame, Callback callback)
499     {
500         control(getStream(frame.getStreamId()), callback, frame);
501     }
502 
503     /**
504      * Invoked internally and by applications to send a GO_AWAY frame to the
505      * other peer. We check the close state to act appropriately:
506      *
507      * * NOT_CLOSED: we move to LOCALLY_CLOSED and queue a GO_AWAY. When the
508      *   GO_AWAY has been written, it will only cause the output to be shut
509      *   down (not the connection closed), so that the application can still
510      *   read frames arriving from the other peer.
511      *   Ideally the other peer will notice the GO_AWAY and close the connection.
512      *   When that happen, we close the connection from {@link #onShutdown()}.
513      *   Otherwise, the idle timeout mechanism will close the connection, see
514      *   {@link #onIdleTimeout()}.
515      *
516      * * In all other cases, we do nothing since other methods are already
517      *   performing their actions.
518      *
519      * @param error the error code
520      * @param reason the reason
521      * @param callback the callback to invoke when the operation is complete
522      * @see #onGoAway(GoAwayFrame)
523      * @see #onShutdown()
524      * @see #onIdleTimeout()
525      */
526     @Override
527     public boolean close(int error, String reason, Callback callback)
528     {
529         while (true)
530         {
531             CloseState current = closed.get();
532             switch (current)
533             {
534                 case NOT_CLOSED:
535                 {
536                     if (closed.compareAndSet(current, CloseState.LOCALLY_CLOSED))
537                     {
538                         byte[] payload = reason == null ? null : reason.getBytes(StandardCharsets.UTF_8);
539                         GoAwayFrame frame = new GoAwayFrame(lastStreamId.get(), error, payload);
540                         if (LOG.isDebugEnabled())
541                             LOG.debug("Sending {}", frame);
542                         control(null, callback, frame);
543                         return true;
544                     }
545                     break;
546                 }
547                 default:
548                 {
549                     if (LOG.isDebugEnabled())
550                         LOG.debug("Ignoring close {}/{}, already closed", error, reason);
551                     callback.succeeded();
552                     return false;
553                 }
554             }
555         }
556     }
557 
558     @Override
559     public boolean isClosed()
560     {
561         return closed.get() != CloseState.NOT_CLOSED;
562     }
563 
564     private void control(IStream stream, Callback callback, Frame frame)
565     {
566         frames(stream, callback, frame, Frame.EMPTY_ARRAY);
567     }
568 
569     @Override
570     public void frames(IStream stream, Callback callback, Frame frame, Frame... frames)
571     {
572         // We want to generate as late as possible to allow re-prioritization;
573         // generation will happen while processing the entries.
574 
575         // The callback needs to be notified only when the last frame completes.
576 
577         int length = frames.length;
578         if (length == 0)
579         {
580             frame(new ControlEntry(frame, stream, callback), true);
581         }
582         else
583         {
584             callback = new CountingCallback(callback, 1 + length);
585             frame(new ControlEntry(frame, stream, callback), false);
586             for (int i = 1; i <= length; ++i)
587                 frame(new ControlEntry(frames[i - 1], stream, callback), i == length);
588         }
589     }
590 
591     @Override
592     public void data(IStream stream, Callback callback, DataFrame frame)
593     {
594         // We want to generate as late as possible to allow re-prioritization.
595         frame(new DataEntry(frame, stream, callback), true);
596     }
597 
598     private void frame(HTTP2Flusher.Entry entry, boolean flush)
599     {
600         if (LOG.isDebugEnabled())
601             LOG.debug("Sending {}", entry.frame);
602         // Ping frames are prepended to process them as soon as possible.
603         boolean queued = entry.frame.getType() == FrameType.PING ? flusher.prepend(entry) : flusher.append(entry);
604         if (queued && flush)
605             flusher.iterate();
606     }
607 
608     protected IStream createLocalStream(int streamId, Promise<Stream> promise)
609     {
610         while (true)
611         {
612             int localCount = localStreamCount.get();
613             int maxCount = maxLocalStreams;
614             if (maxCount >= 0 && localCount >= maxCount)
615             {
616                 promise.failed(new IllegalStateException("Max local stream count " + maxCount + " exceeded"));
617                 return null;
618             }
619             if (localStreamCount.compareAndSet(localCount, localCount + 1))
620                 break;
621         }
622 
623         IStream stream = newStream(streamId);
624         if (streams.putIfAbsent(streamId, stream) == null)
625         {
626             stream.setIdleTimeout(getStreamIdleTimeout());
627             flowControl.onStreamCreated(stream, true);
628             if (LOG.isDebugEnabled())
629                 LOG.debug("Created local {}", stream);
630             return stream;
631         }
632         else
633         {
634             promise.failed(new IllegalStateException("Duplicate stream " + streamId));
635             return null;
636         }
637     }
638 
639     protected IStream createRemoteStream(int streamId)
640     {
641         // SPEC: exceeding max concurrent streams is treated as stream error.
642         while (true)
643         {
644             int remoteCount = remoteStreamCount.get();
645             int maxCount = getMaxRemoteStreams();
646             if (maxCount >= 0 && remoteCount >= maxCount)
647             {
648                 reset(new ResetFrame(streamId, ErrorCode.REFUSED_STREAM_ERROR.code), Callback.NOOP);
649                 return null;
650             }
651             if (remoteStreamCount.compareAndSet(remoteCount, remoteCount + 1))
652                 break;
653         }
654 
655         IStream stream = newStream(streamId);
656 
657         // SPEC: duplicate stream is treated as connection error.
658         if (streams.putIfAbsent(streamId, stream) == null)
659         {
660             updateLastStreamId(streamId);
661             stream.setIdleTimeout(getStreamIdleTimeout());
662             flowControl.onStreamCreated(stream, false);
663             if (LOG.isDebugEnabled())
664                 LOG.debug("Created remote {}", stream);
665             return stream;
666         }
667         else
668         {
669             close(ErrorCode.PROTOCOL_ERROR.code, "duplicate_stream", Callback.NOOP);
670             return null;
671         }
672     }
673 
674     protected IStream newStream(int streamId)
675     {
676         return new HTTP2Stream(scheduler, this, streamId);
677     }
678 
679     @Override
680     public void removeStream(IStream stream, boolean local)
681     {
682         IStream removed = streams.remove(stream.getId());
683         if (removed != null)
684         {
685             assert removed == stream;
686 
687             if (local)
688                 localStreamCount.decrementAndGet();
689             else
690                 remoteStreamCount.decrementAndGet();
691 
692             flowControl.onStreamDestroyed(stream, local);
693 
694             if (LOG.isDebugEnabled())
695                 LOG.debug("Removed {}", stream);
696         }
697     }
698 
699     @Override
700     public Collection<Stream> getStreams()
701     {
702         List<Stream> result = new ArrayList<>();
703         result.addAll(streams.values());
704         return result;
705     }
706 
707     @Override
708     public IStream getStream(int streamId)
709     {
710         return streams.get(streamId);
711     }
712 
713     public int getSendWindow()
714     {
715         return sendWindow.get();
716     }
717 
718     public int getRecvWindow()
719     {
720         return recvWindow.get();
721     }
722 
723     @Override
724     public int updateSendWindow(int delta)
725     {
726         return sendWindow.getAndAdd(delta);
727     }
728 
729     @Override
730     public int updateRecvWindow(int delta)
731     {
732         return recvWindow.getAndAdd(delta);
733     }
734 
735     @Override
736     public void onWindowUpdate(IStream stream, WindowUpdateFrame frame)
737     {
738         // WindowUpdateFrames arrive concurrently with writes.
739         // Increasing (or reducing) the window size concurrently
740         // with writes requires coordination with the flusher, that
741         // decides how many frames to write depending on the available
742         // window sizes. If the window sizes vary concurrently, the
743         // flusher may take non-optimal or wrong decisions.
744         // Here, we "queue" window updates to the flusher, so it will
745         // be the only component responsible for window updates, for
746         // both increments and reductions.
747         flusher.window(stream, frame);
748     }
749 
750     @Override
751     public boolean isPushEnabled()
752     {
753         return pushEnabled;
754     }
755 
756     /**
757      * A typical close by a remote peer involves a GO_AWAY frame followed by TCP FIN.
758      * This method is invoked when the TCP FIN is received, or when an exception is
759      * thrown while reading, and we check the close state to act appropriately:
760      *
761      * * NOT_CLOSED: means that the remote peer did not send a GO_AWAY (abrupt close)
762      *   or there was an exception while reading, and therefore we terminate.
763      *
764      * * LOCALLY_CLOSED: we have sent the GO_AWAY to the remote peer, which received
765      *   it and closed the connection; we queue a disconnect to close the connection
766      *   on the local side.
767      *   The GO_AWAY just shutdown the output, so we need this step to make sure the
768      *   connection is closed. See {@link #close(int, String, Callback)}.
769      *
770      * * REMOTELY_CLOSED: we received the GO_AWAY, and the TCP FIN afterwards, so we
771      *   do nothing since the handling of the GO_AWAY will take care of closing the
772      *   connection. See {@link #onGoAway(GoAwayFrame)}.
773      *
774      * @see #onGoAway(GoAwayFrame)
775      * @see #close(int, String, Callback)
776      * @see #onIdleTimeout()
777      */
778     @Override
779     public void onShutdown()
780     {
781         if (LOG.isDebugEnabled())
782             LOG.debug("Shutting down {}", this);
783 
784         switch (closed.get())
785         {
786             case NOT_CLOSED:
787             {
788                 // The other peer did not send a GO_AWAY, no need to be gentle.
789                 if (LOG.isDebugEnabled())
790                     LOG.debug("Abrupt close for {}", this);
791                 abort(new ClosedChannelException());
792                 break;
793             }
794             case LOCALLY_CLOSED:
795             {
796                 // We have closed locally, and only shutdown
797                 // the output; now queue a disconnect.
798                 control(null, Callback.NOOP, new DisconnectFrame());
799                 break;
800             }
801             case REMOTELY_CLOSED:
802             {
803                 // Nothing to do, the GO_AWAY frame we
804                 // received will close the connection.
805                 break;
806             }
807             default:
808             {
809                 break;
810             }
811         }
812     }
813 
814     /**
815      * This method is invoked when the idle timeout triggers. We check the close state
816      * to act appropriately:
817      *
818      * * NOT_CLOSED: it's a real idle timeout, we just initiate a close, see
819      *   {@link #close(int, String, Callback)}.
820      *
821      * * LOCALLY_CLOSED: we have sent a GO_AWAY and only shutdown the output, but the
822      *   other peer did not close the connection so we never received the TCP FIN, and
823      *   therefore we terminate.
824      *
825      * * REMOTELY_CLOSED: the other peer sent us a GO_AWAY, we should have queued a
826      *   disconnect, but for some reason it was not processed (for example, queue was
827      *   stuck because of TCP congestion), therefore we terminate.
828      *   See {@link #onGoAway(GoAwayFrame)}.
829      *
830      * @see #onGoAway(GoAwayFrame)
831      * @see #close(int, String, Callback)
832      * @see #onShutdown()
833      */
834     @Override
835     public void onIdleTimeout()
836     {
837         switch (closed.get())
838         {
839             case NOT_CLOSED:
840             {
841                 // Real idle timeout, just close.
842                 close(ErrorCode.NO_ERROR.code, "idle_timeout", Callback.NOOP);
843                 break;
844             }
845             case LOCALLY_CLOSED:
846             case REMOTELY_CLOSED:
847             {
848                 abort(new TimeoutException());
849                 break;
850             }
851             default:
852             {
853                 break;
854             }
855         }
856     }
857 
858     @Override
859     public void onFrame(Frame frame)
860     {
861         onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "upgrade");
862     }
863 
864     public void disconnect()
865     {
866         if (LOG.isDebugEnabled())
867             LOG.debug("Disconnecting {}", this);
868         endPoint.close();
869     }
870 
871     private void terminate()
872     {
873         while (true)
874         {
875             CloseState current = closed.get();
876             switch (current)
877             {
878                 case NOT_CLOSED:
879                 case LOCALLY_CLOSED:
880                 case REMOTELY_CLOSED:
881                 {
882                     if (closed.compareAndSet(current, CloseState.CLOSED))
883                     {
884                         flusher.close();
885                         for (IStream stream : streams.values())
886                             stream.close();
887                         streams.clear();
888                         disconnect();
889                         return;
890                     }
891                     break;
892                 }
893                 default:
894                 {
895                     return;
896                 }
897             }
898         }
899     }
900 
901     protected void abort(Throwable failure)
902     {
903         terminate();
904         notifyFailure(this, failure);
905     }
906 
907     public boolean isDisconnected()
908     {
909         return !endPoint.isOpen();
910     }
911 
912     private void updateLastStreamId(int streamId)
913     {
914         Atomics.updateMax(lastStreamId, streamId);
915     }
916 
917     protected Stream.Listener notifyNewStream(Stream stream, HeadersFrame frame)
918     {
919         try
920         {
921             return listener.onNewStream(stream, frame);
922         }
923         catch (Throwable x)
924         {
925             LOG.info("Failure while notifying listener " + listener, x);
926             return null;
927         }
928     }
929 
930     protected void notifySettings(Session session, SettingsFrame frame)
931     {
932         try
933         {
934             listener.onSettings(session, frame);
935         }
936         catch (Throwable x)
937         {
938             LOG.info("Failure while notifying listener " + listener, x);
939         }
940     }
941 
942     protected void notifyPing(Session session, PingFrame frame)
943     {
944         try
945         {
946             listener.onPing(session, frame);
947         }
948         catch (Throwable x)
949         {
950             LOG.info("Failure while notifying listener " + listener, x);
951         }
952     }
953 
954     protected void notifyReset(Session session, ResetFrame frame)
955     {
956         try
957         {
958             listener.onReset(session, frame);
959         }
960         catch (Throwable x)
961         {
962             LOG.info("Failure while notifying listener " + listener, x);
963         }
964     }
965 
966     protected void notifyClose(Session session, GoAwayFrame frame)
967     {
968         try
969         {
970             listener.onClose(session, frame);
971         }
972         catch (Throwable x)
973         {
974             LOG.info("Failure while notifying listener " + listener, x);
975         }
976     }
977 
978     protected void notifyFailure(Session session, Throwable failure)
979     {
980         try
981         {
982             listener.onFailure(session, failure);
983         }
984         catch (Throwable x)
985         {
986             LOG.info("Failure while notifying listener " + listener, x);
987         }
988     }
989 
990     @Override
991     public String toString()
992     {
993         return String.format("%s@%x{queueSize=%d,sendWindow=%s,recvWindow=%s,streams=%d,%s}", getClass().getSimpleName(),
994                 hashCode(), flusher.getQueueSize(), sendWindow, recvWindow, streams.size(), closed);
995     }
996 
997     private class ControlEntry extends HTTP2Flusher.Entry
998     {
999         private ControlEntry(Frame frame, IStream stream, Callback callback)
1000         {
1001             super(frame, stream, callback);
1002         }
1003 
1004         public Throwable generate(ByteBufferPool.Lease lease)
1005         {
1006             try
1007             {
1008                 generator.control(lease, frame);
1009                 if (LOG.isDebugEnabled())
1010                     LOG.debug("Generated {}", frame);
1011                 prepare();
1012                 return null;
1013             }
1014             catch (Throwable x)
1015             {
1016                 if (LOG.isDebugEnabled())
1017                     LOG.debug("Failure generating frame " + frame, x);
1018                 return x;
1019             }
1020         }
1021 
1022         /**
1023          * <p>Performs actions just before writing the frame to the network.</p>
1024          * <p>Some frame, when sent over the network, causes the receiver
1025          * to react and send back frames that may be processed by the original
1026          * sender *before* {@link #succeeded()} is called.
1027          * <p>If the action to perform updates some state, this update may
1028          * not be seen by the received frames and cause errors.</p>
1029          * <p>For example, suppose the action updates the stream window to a
1030          * larger value; the sender sends the frame; the receiver is now entitled
1031          * to send back larger data; when the data is received by the original
1032          * sender, the action may have not been performed yet, causing the larger
1033          * data to be rejected, when it should have been accepted.</p>
1034          */
1035         private void prepare()
1036         {
1037             switch (frame.getType())
1038             {
1039                 case SETTINGS:
1040                 {
1041                     SettingsFrame settingsFrame = (SettingsFrame)frame;
1042                     Integer initialWindow = settingsFrame.getSettings().get(SettingsFrame.INITIAL_WINDOW_SIZE);
1043                     if (initialWindow != null)
1044                         flowControl.updateInitialStreamWindow(HTTP2Session.this, initialWindow, true);
1045                     break;
1046                 }
1047                 default:
1048                 {
1049                     break;
1050                 }
1051             }
1052         }
1053 
1054         @Override
1055         public void succeeded()
1056         {
1057             switch (frame.getType())
1058             {
1059                 case HEADERS:
1060                 {
1061                     HeadersFrame headersFrame = (HeadersFrame)frame;
1062                     if (stream.updateClose(headersFrame.isEndStream(), true))
1063                         removeStream(stream, true);
1064                     break;
1065                 }
1066                 case RST_STREAM:
1067                 {
1068                     if (stream != null)
1069                     {
1070                         stream.close();
1071                         removeStream(stream, true);
1072                     }
1073                     break;
1074                 }
1075                 case PUSH_PROMISE:
1076                 {
1077                     // Pushed streams are implicitly remotely closed.
1078                     // They are closed when sending an end-stream DATA frame.
1079                     stream.updateClose(true, false);
1080                     break;
1081                 }
1082                 case GO_AWAY:
1083                 {
1084                     // We just sent a GO_AWAY, only shutdown the
1085                     // output without closing yet, to allow reads.
1086                     getEndPoint().shutdownOutput();
1087                     break;
1088                 }
1089                 case WINDOW_UPDATE:
1090                 {
1091                     flowControl.windowUpdate(HTTP2Session.this, stream, (WindowUpdateFrame)frame);
1092                     break;
1093                 }
1094                 case DISCONNECT:
1095                 {
1096                     terminate();
1097                     break;
1098                 }
1099                 default:
1100                 {
1101                     break;
1102                 }
1103             }
1104             callback.succeeded();
1105         }
1106     }
1107 
1108     private class DataEntry extends HTTP2Flusher.Entry
1109     {
1110         private int length;
1111 
1112         private DataEntry(DataFrame frame, IStream stream, Callback callback)
1113         {
1114             super(frame, stream, callback);
1115         }
1116 
1117         @Override
1118         public int dataRemaining()
1119         {
1120             // We don't do any padding, so the flow control length is
1121             // always the data remaining. This simplifies the handling
1122             // of data frames that cannot be completely written due to
1123             // the flow control window exhausting, since in that case
1124             // we would have to count the padding only once.
1125             return ((DataFrame)frame).remaining();
1126         }
1127 
1128         public Throwable generate(ByteBufferPool.Lease lease)
1129         {
1130             try
1131             {
1132                 int flowControlLength = dataRemaining();
1133 
1134                 int sessionSendWindow = getSendWindow();
1135                 if (sessionSendWindow < 0)
1136                     throw new IllegalStateException();
1137 
1138                 int streamSendWindow = stream.updateSendWindow(0);
1139                 if (streamSendWindow < 0)
1140                     throw new IllegalStateException();
1141 
1142                 int window = Math.min(streamSendWindow, sessionSendWindow);
1143 
1144                 int length = this.length = Math.min(flowControlLength, window);
1145                 if (LOG.isDebugEnabled())
1146                     LOG.debug("Generated {}, length/window={}/{}", frame, length, window);
1147 
1148                 generator.data(lease, (DataFrame)frame, length);
1149                 flowControl.onDataSending(stream, length);
1150                 return null;
1151             }
1152             catch (Throwable x)
1153             {
1154                 if (LOG.isDebugEnabled())
1155                     LOG.debug("Failure generating frame " + frame, x);
1156                 return x;
1157             }
1158         }
1159 
1160         @Override
1161         public void succeeded()
1162         {
1163             flowControl.onDataSent(stream, length);
1164             // Do we have more to send ?
1165             DataFrame dataFrame = (DataFrame)frame;
1166             if (dataFrame.remaining() > 0)
1167             {
1168                 // We have written part of the frame, but there is more to write.
1169                 // We need to keep the correct ordering of frames, to avoid that other
1170                 // frames for the same stream are written before this one is finished.
1171                 flusher.prepend(this);
1172             }
1173             else
1174             {
1175                 // Only now we can update the close state
1176                 // and eventually remove the stream.
1177                 if (stream.updateClose(dataFrame.isEndStream(), true))
1178                     removeStream(stream, true);
1179                 callback.succeeded();
1180             }
1181         }
1182     }
1183 
1184     private static class PromiseCallback<C> implements Callback
1185     {
1186         private final Promise<C> promise;
1187         private final C value;
1188 
1189         private PromiseCallback(Promise<C> promise, C value)
1190         {
1191             this.promise = promise;
1192             this.value = value;
1193         }
1194 
1195         @Override
1196         public void succeeded()
1197         {
1198             promise.succeeded(value);
1199         }
1200 
1201         @Override
1202         public void failed(Throwable x)
1203         {
1204             promise.failed(x);
1205         }
1206     }
1207 }