View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.spdy;
20  
21  import java.io.IOException;
22  import java.net.InetSocketAddress;
23  import java.nio.ByteBuffer;
24  import java.nio.channels.InterruptedByTimeoutException;
25  import java.util.Collections;
26  import java.util.HashSet;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Objects;
30  import java.util.Set;
31  import java.util.concurrent.ConcurrentHashMap;
32  import java.util.concurrent.ConcurrentMap;
33  import java.util.concurrent.CopyOnWriteArrayList;
34  import java.util.concurrent.ExecutionException;
35  import java.util.concurrent.TimeUnit;
36  import java.util.concurrent.TimeoutException;
37  import java.util.concurrent.atomic.AtomicBoolean;
38  import java.util.concurrent.atomic.AtomicInteger;
39  
40  import org.eclipse.jetty.io.ByteBufferPool;
41  import org.eclipse.jetty.io.EndPoint;
42  import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
43  import org.eclipse.jetty.spdy.api.DataInfo;
44  import org.eclipse.jetty.spdy.api.GoAwayInfo;
45  import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
46  import org.eclipse.jetty.spdy.api.PingInfo;
47  import org.eclipse.jetty.spdy.api.PingResultInfo;
48  import org.eclipse.jetty.spdy.api.PushInfo;
49  import org.eclipse.jetty.spdy.api.RstInfo;
50  import org.eclipse.jetty.spdy.api.SPDYException;
51  import org.eclipse.jetty.spdy.api.Session;
52  import org.eclipse.jetty.spdy.api.SessionFrameListener;
53  import org.eclipse.jetty.spdy.api.SessionStatus;
54  import org.eclipse.jetty.spdy.api.Settings;
55  import org.eclipse.jetty.spdy.api.SettingsInfo;
56  import org.eclipse.jetty.spdy.api.Stream;
57  import org.eclipse.jetty.spdy.api.StreamFrameListener;
58  import org.eclipse.jetty.spdy.api.StreamStatus;
59  import org.eclipse.jetty.spdy.api.SynInfo;
60  import org.eclipse.jetty.spdy.frames.ControlFrame;
61  import org.eclipse.jetty.spdy.frames.ControlFrameType;
62  import org.eclipse.jetty.spdy.frames.CredentialFrame;
63  import org.eclipse.jetty.spdy.frames.DataFrame;
64  import org.eclipse.jetty.spdy.frames.GoAwayFrame;
65  import org.eclipse.jetty.spdy.frames.HeadersFrame;
66  import org.eclipse.jetty.spdy.frames.PingFrame;
67  import org.eclipse.jetty.spdy.frames.RstStreamFrame;
68  import org.eclipse.jetty.spdy.frames.SettingsFrame;
69  import org.eclipse.jetty.spdy.frames.SynReplyFrame;
70  import org.eclipse.jetty.spdy.frames.SynStreamFrame;
71  import org.eclipse.jetty.spdy.frames.WindowUpdateFrame;
72  import org.eclipse.jetty.spdy.generator.Generator;
73  import org.eclipse.jetty.spdy.parser.Parser;
74  import org.eclipse.jetty.util.Atomics;
75  import org.eclipse.jetty.util.BufferUtil;
76  import org.eclipse.jetty.util.Callback;
77  import org.eclipse.jetty.util.FutureCallback;
78  import org.eclipse.jetty.util.FuturePromise;
79  import org.eclipse.jetty.util.Promise;
80  import org.eclipse.jetty.util.component.ContainerLifeCycle;
81  import org.eclipse.jetty.util.component.Dumpable;
82  import org.eclipse.jetty.util.log.Log;
83  import org.eclipse.jetty.util.log.Logger;
84  import org.eclipse.jetty.util.thread.Scheduler;
85  
86  public class StandardSession implements ISession, Parser.Listener, Dumpable
87  {
88      private static final Logger LOG = Log.getLogger(Session.class);
89  
90      private final Flusher flusher;
91      private final Map<String, Object> attributes = new ConcurrentHashMap<>();
92      private final List<Listener> listeners = new CopyOnWriteArrayList<>();
93      private final ConcurrentMap<Integer, IStream> streams = new ConcurrentHashMap<>();
94      private final ByteBufferPool bufferPool;
95      private final Scheduler scheduler;
96      private final short version;
97      private final Controller controller;
98      private final EndPoint endPoint;
99      private final IdleListener idleListener;
100     private final AtomicInteger streamIds;
101     private final AtomicInteger pingIds;
102     private final SessionFrameListener listener;
103     private final Generator generator;
104     private final AtomicBoolean goAwaySent = new AtomicBoolean();
105     private final AtomicBoolean goAwayReceived = new AtomicBoolean();
106     private final AtomicInteger lastStreamId = new AtomicInteger();
107     private final AtomicInteger localStreamCount = new AtomicInteger(0);
108     private final FlowControlStrategy flowControlStrategy;
109     private volatile int maxConcurrentLocalStreams = -1;
110 
111     public StandardSession(short version, ByteBufferPool bufferPool, Scheduler scheduler,
112                            Controller controller, EndPoint endPoint, IdleListener idleListener, int initialStreamId,
113                            SessionFrameListener listener, Generator generator, FlowControlStrategy flowControlStrategy)
114     {
115         // TODO this should probably be an aggregate lifecycle
116         this.version = version;
117         this.bufferPool = bufferPool;
118         this.scheduler = scheduler;
119         this.controller = controller;
120         this.endPoint = endPoint;
121         this.idleListener = idleListener;
122         this.streamIds = new AtomicInteger(initialStreamId);
123         this.pingIds = new AtomicInteger(initialStreamId);
124         this.listener = listener;
125         this.generator = generator;
126         this.flowControlStrategy = flowControlStrategy;
127         this.flusher = new Flusher(controller);
128     }
129 
130     @Override
131     public short getVersion()
132     {
133         return version;
134     }
135 
136     @Override
137     public void addListener(Listener listener)
138     {
139         listeners.add(listener);
140     }
141 
142     @Override
143     public void removeListener(Listener listener)
144     {
145         listeners.remove(listener);
146     }
147 
148     @Override
149     public Stream syn(SynInfo synInfo, StreamFrameListener listener) throws ExecutionException, InterruptedException, TimeoutException
150     {
151         FuturePromise<Stream> result = new FuturePromise<>();
152         syn(synInfo, listener, result);
153         if (synInfo.getTimeout() > 0)
154             return result.get(synInfo.getTimeout(), synInfo.getUnit());
155         else
156             return result.get();
157     }
158 
159     @Override
160     public void syn(SynInfo synInfo, StreamFrameListener listener, Promise<Stream> promise)
161     {
162         // Synchronization is necessary.
163         // SPEC v3, 2.3.1 requires that the stream creation be monotonically crescent
164         // so we cannot allow thread1 to create stream1 and thread2 create stream3 and
165         // have stream3 hit the network before stream1, not only to comply with the spec
166         // but also because the compression context for the headers would be wrong, as the
167         // frame with a compression history will come before the first compressed frame.
168         int associatedStreamId = 0;
169         if (synInfo instanceof PushSynInfo)
170             associatedStreamId = ((PushSynInfo)synInfo).getAssociatedStreamId();
171 
172         synchronized (this)
173         {
174             int streamId = streamIds.getAndAdd(2);
175             // TODO: for SPDYv3 we need to support the "slot" argument
176             SynStreamFrame synStream = new SynStreamFrame(version, synInfo.getFlags(), streamId, associatedStreamId, synInfo.getPriority(), (short)0, synInfo.getHeaders());
177             IStream stream = createStream(synStream, listener, true, promise);
178             if (stream == null)
179                 return;
180             generateAndEnqueueControlFrame(stream, synStream, synInfo.getTimeout(), synInfo.getUnit(), stream);
181         }
182     }
183 
184     @Override
185     public void rst(RstInfo rstInfo) throws InterruptedException, ExecutionException, TimeoutException
186     {
187         FutureCallback result = new FutureCallback();
188         rst(rstInfo, result);
189         if (rstInfo.getTimeout() > 0)
190             result.get(rstInfo.getTimeout(), rstInfo.getUnit());
191         else
192             result.get();
193     }
194 
195     @Override
196     public void rst(RstInfo rstInfo, Callback callback)
197     {
198         // SPEC v3, 2.2.2
199         if (goAwaySent.get())
200         {
201             complete(callback);
202         }
203         else
204         {
205             int streamId = rstInfo.getStreamId();
206             IStream stream = streams.get(streamId);
207             RstStreamFrame frame = new RstStreamFrame(version, streamId, rstInfo.getStreamStatus().getCode(version));
208             control(stream, frame, rstInfo.getTimeout(), rstInfo.getUnit(), callback);
209             if (stream != null)
210             {
211                 stream.process(frame);
212                 flusher.removeFrameBytesFromQueue(stream);
213                 removeStream(stream);
214             }
215         }
216     }
217 
218     @Override
219     public void settings(SettingsInfo settingsInfo) throws ExecutionException, InterruptedException, TimeoutException
220     {
221         FutureCallback result = new FutureCallback();
222         settings(settingsInfo, result);
223         if (settingsInfo.getTimeout() > 0)
224             result.get(settingsInfo.getTimeout(), settingsInfo.getUnit());
225         else
226             result.get();
227     }
228 
229     @Override
230     public void settings(SettingsInfo settingsInfo, Callback callback)
231     {
232         SettingsFrame frame = new SettingsFrame(version, settingsInfo.getFlags(), settingsInfo.getSettings());
233         control(null, frame, settingsInfo.getTimeout(), settingsInfo.getUnit(), callback);
234     }
235 
236     @Override
237     public PingResultInfo ping(PingInfo pingInfo) throws ExecutionException, InterruptedException, TimeoutException
238     {
239         FuturePromise<PingResultInfo> result = new FuturePromise<>();
240         ping(pingInfo, result);
241         if (pingInfo.getTimeout() > 0)
242             return result.get(pingInfo.getTimeout(), pingInfo.getUnit());
243         else
244             return result.get();
245     }
246 
247     @Override
248     public void ping(PingInfo pingInfo, Promise<PingResultInfo> promise)
249     {
250         int pingId = pingIds.getAndAdd(2);
251         PingInfoCallback pingInfoCallback = new PingInfoCallback(pingId, promise);
252         PingFrame frame = new PingFrame(version, pingId);
253         control(null, frame, pingInfo.getTimeout(), pingInfo.getUnit(), pingInfoCallback);
254     }
255 
256     @Override
257     public void goAway(GoAwayInfo goAwayInfo) throws ExecutionException, InterruptedException, TimeoutException
258     {
259         goAway(goAwayInfo, SessionStatus.OK);
260     }
261 
262     private void goAway(GoAwayInfo goAwayInfo, SessionStatus sessionStatus) throws ExecutionException, InterruptedException, TimeoutException
263     {
264         FutureCallback result = new FutureCallback();
265         goAway(sessionStatus, goAwayInfo.getTimeout(), goAwayInfo.getUnit(), result);
266         if (goAwayInfo.getTimeout() > 0)
267             result.get(goAwayInfo.getTimeout(), goAwayInfo.getUnit());
268         else
269             result.get();
270     }
271 
272     @Override
273     public void goAway(GoAwayInfo goAwayInfo, Callback callback)
274     {
275         goAway(SessionStatus.OK, goAwayInfo.getTimeout(), goAwayInfo.getUnit(), callback);
276     }
277 
278     private void goAway(SessionStatus sessionStatus, long timeout, TimeUnit unit, Callback callback)
279     {
280         if (goAwaySent.compareAndSet(false, true))
281         {
282             if (!goAwayReceived.get())
283             {
284                 GoAwayFrame frame = new GoAwayFrame(version, lastStreamId.get(), sessionStatus.getCode());
285                 control(null, frame, timeout, unit, callback);
286                 return;
287             }
288         }
289         complete(callback);
290     }
291 
292     @Override
293     public Set<Stream> getStreams()
294     {
295         Set<Stream> result = new HashSet<>();
296         result.addAll(streams.values());
297         return result;
298     }
299 
300     @Override
301     public IStream getStream(int streamId)
302     {
303         return streams.get(streamId);
304     }
305 
306     @Override
307     public Object getAttribute(String key)
308     {
309         return attributes.get(key);
310     }
311 
312     @Override
313     public void setAttribute(String key, Object value)
314     {
315         attributes.put(key, value);
316     }
317 
318     @Override
319     public Object removeAttribute(String key)
320     {
321         return attributes.remove(key);
322     }
323 
324     @Override
325     public InetSocketAddress getLocalAddress()
326     {
327         return endPoint.getLocalAddress();
328     }
329 
330     @Override
331     public InetSocketAddress getRemoteAddress()
332     {
333         return endPoint.getRemoteAddress();
334     }
335 
336     @Override
337     public void onControlFrame(ControlFrame frame)
338     {
339         notifyIdle(idleListener, false);
340         try
341         {
342             LOG.debug("Processing {}", frame);
343 
344             if (goAwaySent.get())
345             {
346                 LOG.debug("Skipped processing of {}", frame);
347                 return;
348             }
349 
350             switch (frame.getType())
351             {
352                 case SYN_STREAM:
353                 {
354                     onSyn((SynStreamFrame)frame);
355                     break;
356                 }
357                 case SYN_REPLY:
358                 {
359                     onReply((SynReplyFrame)frame);
360                     break;
361                 }
362                 case RST_STREAM:
363                 {
364                     onRst((RstStreamFrame)frame);
365                     break;
366                 }
367                 case SETTINGS:
368                 {
369                     onSettings((SettingsFrame)frame);
370                     break;
371                 }
372                 case NOOP:
373                 {
374                     // Just ignore it
375                     break;
376                 }
377                 case PING:
378                 {
379                     onPing((PingFrame)frame);
380                     break;
381                 }
382                 case GO_AWAY:
383                 {
384                     onGoAway((GoAwayFrame)frame);
385                     break;
386                 }
387                 case HEADERS:
388                 {
389                     onHeaders((HeadersFrame)frame);
390                     break;
391                 }
392                 case WINDOW_UPDATE:
393                 {
394                     onWindowUpdate((WindowUpdateFrame)frame);
395                     break;
396                 }
397                 case CREDENTIAL:
398                 {
399                     onCredential((CredentialFrame)frame);
400                     break;
401                 }
402                 default:
403                 {
404                     throw new IllegalStateException();
405                 }
406             }
407         }
408         finally
409         {
410             notifyIdle(idleListener, true);
411         }
412     }
413 
414     @Override
415     public void onDataFrame(DataFrame frame, ByteBuffer data)
416     {
417         notifyIdle(idleListener, false);
418         try
419         {
420             LOG.debug("Processing {}, {} data bytes", frame, data.remaining());
421 
422             if (goAwaySent.get())
423             {
424                 LOG.debug("Skipped processing of {}", frame);
425                 return;
426             }
427 
428             int streamId = frame.getStreamId();
429             IStream stream = streams.get(streamId);
430             if (stream == null)
431             {
432                 RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM);
433                 LOG.debug("Unknown stream {}", rstInfo);
434                 rst(rstInfo, new Callback.Adapter());
435             }
436             else
437             {
438                 processData(stream, frame, data);
439             }
440         }
441         finally
442         {
443             notifyIdle(idleListener, true);
444         }
445     }
446 
447     private void notifyIdle(IdleListener listener, boolean idle)
448     {
449         if (listener != null)
450             listener.onIdle(idle);
451     }
452 
453     private void processData(final IStream stream, DataFrame frame, ByteBuffer data)
454     {
455         ByteBufferDataInfo dataInfo = new ByteBufferDataInfo(data, frame.isClose())
456         {
457             @Override
458             public void consume(int delta)
459             {
460                 super.consume(delta);
461                 flowControlStrategy.onDataConsumed(StandardSession.this, stream, this, delta);
462             }
463         };
464         flowControlStrategy.onDataReceived(this, stream, dataInfo);
465         stream.process(dataInfo);
466         if (stream.isClosed())
467             removeStream(stream);
468     }
469 
470     @Override
471     public void onStreamException(StreamException x)
472     {
473         // TODO: rename to onFailure
474         notifyOnException(listener, x); //TODO: notify StreamFrameListener if exists?
475         rst(new RstInfo(x.getStreamId(), x.getStreamStatus()), new Callback.Adapter());
476     }
477 
478     @Override
479     public void onSessionException(SessionException x)
480     {
481         Throwable cause = x.getCause();
482         notifyOnException(listener, cause == null ? x : cause);
483         goAway(x.getSessionStatus(), 0, TimeUnit.SECONDS, new Callback.Adapter());
484     }
485 
486     private void onSyn(final SynStreamFrame frame)
487     {
488         IStream stream = createStream(frame, null, false, new Promise.Adapter<Stream>()
489         {
490             @Override
491             public void failed(Throwable x)
492             {
493                 LOG.debug("Received: {} but creating new Stream failed: {}", frame, x.getMessage());
494             }
495         });
496         if (stream != null)
497             processSyn(listener, stream, frame);
498     }
499 
500     private void processSyn(SessionFrameListener listener, IStream stream, SynStreamFrame frame)
501     {
502         stream.process(frame);
503         // Update the last stream id before calling the application (which may send a GO_AWAY)
504         updateLastStreamId(stream);
505         StreamFrameListener streamListener;
506         if (stream.isUnidirectional())
507         {
508             PushInfo pushInfo = new PushInfo(frame.getHeaders(), frame.isClose());
509             streamListener = notifyOnPush(stream.getAssociatedStream().getStreamFrameListener(), stream, pushInfo);
510         }
511         else
512         {
513             SynInfo synInfo = new SynInfo(frame.getHeaders(), frame.isClose(), frame.getPriority());
514             streamListener = notifyOnSyn(listener, stream, synInfo);
515         }
516         stream.setStreamFrameListener(streamListener);
517         // The onSyn() listener may have sent a frame that closed the stream
518         if (stream.isClosed())
519             removeStream(stream);
520     }
521 
522     private IStream createStream(SynStreamFrame frame, StreamFrameListener listener, boolean local, Promise<Stream> promise)
523     {
524         IStream associatedStream = streams.get(frame.getAssociatedStreamId());
525         IStream stream = new StandardStream(frame.getStreamId(), frame.getPriority(), this, associatedStream,
526                 scheduler, promise);
527         stream.setIdleTimeout(endPoint.getIdleTimeout());
528         flowControlStrategy.onNewStream(this, stream);
529 
530         stream.updateCloseState(frame.isClose(), local);
531         stream.setStreamFrameListener(listener);
532 
533         if (stream.isUnidirectional())
534         {
535             // Unidirectional streams are implicitly half closed
536             stream.updateCloseState(true, !local);
537             if (!stream.isClosed())
538                 stream.getAssociatedStream().associate(stream);
539         }
540 
541         int streamId = stream.getId();
542 
543         if (local)
544         {
545             while (true)
546             {
547                 int oldStreamCountValue = localStreamCount.get();
548                 int maxConcurrentStreams = maxConcurrentLocalStreams;
549                 if (maxConcurrentStreams > -1 && oldStreamCountValue >= maxConcurrentStreams)
550                 {
551                     String message = String.format("Max concurrent local streams (%d) exceeded.",
552                             maxConcurrentStreams);
553                     LOG.debug(message);
554                     promise.failed(new SPDYException(message));
555                     return null;
556                 }
557                 if (localStreamCount.compareAndSet(oldStreamCountValue, oldStreamCountValue + 1))
558                     break;
559             }
560         }
561 
562         if (streams.putIfAbsent(streamId, stream) != null)
563         {
564             String message = "Duplicate stream id " + streamId;
565             IllegalStateException duplicateIdException = new IllegalStateException(message);
566             promise.failed(duplicateIdException);
567             if (local)
568             {
569                 localStreamCount.decrementAndGet();
570                 throw duplicateIdException;
571             }
572             RstInfo rstInfo = new RstInfo(streamId, StreamStatus.PROTOCOL_ERROR);
573             LOG.debug("Duplicate stream, {}", rstInfo);
574             rst(rstInfo, new Callback.Adapter()); // We don't care (too much) if the reset fails.
575             return null;
576         }
577         else
578         {
579             LOG.debug("Created {}", stream);
580             notifyStreamCreated(stream);
581             return stream;
582         }
583     }
584 
585     private void notifyStreamCreated(IStream stream)
586     {
587         for (Listener listener : listeners)
588         {
589             if (listener instanceof StreamListener)
590             {
591                 try
592                 {
593                     ((StreamListener)listener).onStreamCreated(stream);
594                 }
595                 catch (Exception x)
596                 {
597                     LOG.info("Exception while notifying listener " + listener, x);
598                 }
599                 catch (Error x)
600                 {
601                     LOG.info("Exception while notifying listener " + listener, x);
602                     throw x;
603                 }
604             }
605         }
606     }
607 
608     private void removeStream(IStream stream)
609     {
610         if (stream.isUnidirectional())
611             stream.getAssociatedStream().disassociate(stream);
612 
613         IStream removed = streams.remove(stream.getId());
614         if (removed != null)
615         {
616             assert removed == stream;
617 
618             if (streamIds.get() % 2 == stream.getId() % 2)
619                 localStreamCount.decrementAndGet();
620 
621             LOG.debug("Removed {}", stream);
622             notifyStreamClosed(stream);
623         }
624     }
625 
626     private void notifyStreamClosed(IStream stream)
627     {
628         for (Listener listener : listeners)
629         {
630             if (listener instanceof StreamListener)
631             {
632                 try
633                 {
634                     ((StreamListener)listener).onStreamClosed(stream);
635                 }
636                 catch (Exception x)
637                 {
638                     LOG.info("Exception while notifying listener " + listener, x);
639                 }
640                 catch (Error x)
641                 {
642                     LOG.info("Exception while notifying listener " + listener, x);
643                     throw x;
644                 }
645             }
646         }
647     }
648 
649     private void onReply(SynReplyFrame frame)
650     {
651         int streamId = frame.getStreamId();
652         IStream stream = streams.get(streamId);
653         if (stream == null)
654         {
655             RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM);
656             LOG.debug("Unknown stream {}", rstInfo);
657             rst(rstInfo, new Callback.Adapter());
658         }
659         else
660         {
661             processReply(stream, frame);
662         }
663     }
664 
665     private void processReply(IStream stream, SynReplyFrame frame)
666     {
667         stream.process(frame);
668         if (stream.isClosed())
669             removeStream(stream);
670     }
671 
672     private void onRst(RstStreamFrame frame)
673     {
674         IStream stream = streams.get(frame.getStreamId());
675 
676         if (stream != null)
677             stream.process(frame);
678 
679         RstInfo rstInfo = new RstInfo(frame.getStreamId(), StreamStatus.from(frame.getVersion(), frame.getStatusCode()));
680         notifyOnRst(listener, rstInfo);
681 
682         if (stream != null)
683             removeStream(stream);
684     }
685 
686     private void onSettings(SettingsFrame frame)
687     {
688         Settings.Setting windowSizeSetting = frame.getSettings().get(Settings.ID.INITIAL_WINDOW_SIZE);
689         if (windowSizeSetting != null)
690         {
691             int windowSize = windowSizeSetting.value();
692             setWindowSize(windowSize);
693             LOG.debug("Updated session window size to {}", windowSize);
694         }
695         Settings.Setting maxConcurrentStreamsSetting = frame.getSettings().get(Settings.ID.MAX_CONCURRENT_STREAMS);
696         if (maxConcurrentStreamsSetting != null)
697         {
698             int maxConcurrentStreamsValue = maxConcurrentStreamsSetting.value();
699             maxConcurrentLocalStreams = maxConcurrentStreamsValue;
700             LOG.debug("Updated session maxConcurrentLocalStreams to {}", maxConcurrentStreamsValue);
701         }
702         SettingsInfo settingsInfo = new SettingsInfo(frame.getSettings(), frame.isClearPersisted());
703         notifyOnSettings(listener, settingsInfo);
704     }
705 
706     private void onPing(PingFrame frame)
707     {
708         int pingId = frame.getPingId();
709         if (pingId % 2 == pingIds.get() % 2)
710         {
711             PingResultInfo pingResultInfo = new PingResultInfo(frame.getPingId());
712             notifyOnPing(listener, pingResultInfo);
713         }
714         else
715         {
716             control(null, frame, 0, TimeUnit.MILLISECONDS, new Callback.Adapter());
717         }
718     }
719 
720     private void onGoAway(GoAwayFrame frame)
721     {
722         if (goAwayReceived.compareAndSet(false, true))
723         {
724             //TODO: Find a better name for GoAwayResultInfo
725             GoAwayResultInfo goAwayResultInfo = new GoAwayResultInfo(frame.getLastStreamId(), SessionStatus.from(frame.getStatusCode()));
726             notifyOnGoAway(listener, goAwayResultInfo);
727             // SPDY does not require to send back a response to a GO_AWAY.
728             // We notified the application of the last good stream id and
729             // tried our best to flush remaining data.
730         }
731     }
732 
733     private void onHeaders(HeadersFrame frame)
734     {
735         int streamId = frame.getStreamId();
736         IStream stream = streams.get(streamId);
737         if (stream == null)
738         {
739             RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM);
740             LOG.debug("Unknown stream, {}", rstInfo);
741             rst(rstInfo, new Callback.Adapter());
742         }
743         else
744         {
745             processHeaders(stream, frame);
746         }
747     }
748 
749     private void processHeaders(IStream stream, HeadersFrame frame)
750     {
751         stream.process(frame);
752         if (stream.isClosed())
753             removeStream(stream);
754     }
755 
756     private void onWindowUpdate(WindowUpdateFrame frame)
757     {
758         int streamId = frame.getStreamId();
759         IStream stream = streams.get(streamId);
760         flowControlStrategy.onWindowUpdate(this, stream, frame.getWindowDelta());
761         flusher.flush();
762     }
763 
764     private void onCredential(CredentialFrame frame)
765     {
766         LOG.warn("{} frame not yet supported", frame.getType());
767     }
768 
769     protected void close()
770     {
771         // Check for null to support tests
772         if (controller != null)
773             controller.close(false);
774     }
775 
776     private void notifyOnException(SessionFrameListener listener, Throwable x)
777     {
778         try
779         {
780             if (listener != null)
781             {
782                 LOG.debug("Invoking callback with {} on listener {}", x, listener);
783                 listener.onFailure(this, x);
784             }
785         }
786         catch (Exception xx)
787         {
788             LOG.info("Exception while notifying listener " + listener, xx);
789         }
790         catch (Error xx)
791         {
792             LOG.info("Exception while notifying listener " + listener, xx);
793             throw xx;
794         }
795     }
796 
797     private StreamFrameListener notifyOnPush(StreamFrameListener listener, Stream stream, PushInfo pushInfo)
798     {
799         try
800         {
801             if (listener == null)
802                 return null;
803             LOG.debug("Invoking callback with {} on listener {}", pushInfo, listener);
804             return listener.onPush(stream, pushInfo);
805         }
806         catch (Exception x)
807         {
808             LOG.info("Exception while notifying listener " + listener, x);
809             return null;
810         }
811         catch (Error x)
812         {
813             LOG.info("Exception while notifying listener " + listener, x);
814             throw x;
815         }
816     }
817 
818     private StreamFrameListener notifyOnSyn(SessionFrameListener listener, Stream stream, SynInfo synInfo)
819     {
820         try
821         {
822             if (listener == null)
823                 return null;
824             LOG.debug("Invoking callback with {} on listener {}", synInfo, listener);
825             return listener.onSyn(stream, synInfo);
826         }
827         catch (Exception x)
828         {
829             LOG.info("Exception while notifying listener " + listener, x);
830             return null;
831         }
832         catch (Error x)
833         {
834             LOG.info("Exception while notifying listener " + listener, x);
835             throw x;
836         }
837     }
838 
839     private void notifyOnRst(SessionFrameListener listener, RstInfo rstInfo)
840     {
841         try
842         {
843             if (listener != null)
844             {
845                 LOG.debug("Invoking callback with {} on listener {}", rstInfo, listener);
846                 listener.onRst(this, rstInfo);
847             }
848         }
849         catch (Exception x)
850         {
851             LOG.info("Exception while notifying listener " + listener, x);
852         }
853         catch (Error x)
854         {
855             LOG.info("Exception while notifying listener " + listener, x);
856             throw x;
857         }
858     }
859 
860     private void notifyOnSettings(SessionFrameListener listener, SettingsInfo settingsInfo)
861     {
862         try
863         {
864             if (listener != null)
865             {
866                 LOG.debug("Invoking callback with {} on listener {}", settingsInfo, listener);
867                 listener.onSettings(this, settingsInfo);
868             }
869         }
870         catch (Exception x)
871         {
872             LOG.info("Exception while notifying listener " + listener, x);
873         }
874         catch (Error x)
875         {
876             LOG.info("Exception while notifying listener " + listener, x);
877             throw x;
878         }
879     }
880 
881     private void notifyOnPing(SessionFrameListener listener, PingResultInfo pingResultInfo)
882     {
883         try
884         {
885             if (listener != null)
886             {
887                 LOG.debug("Invoking callback with {} on listener {}", pingResultInfo, listener);
888                 listener.onPing(this, pingResultInfo);
889             }
890         }
891         catch (Exception x)
892         {
893             LOG.info("Exception while notifying listener " + listener, x);
894         }
895         catch (Error x)
896         {
897             LOG.info("Exception while notifying listener " + listener, x);
898             throw x;
899         }
900     }
901 
902     private void notifyOnGoAway(SessionFrameListener listener, GoAwayResultInfo goAwayResultInfo)
903     {
904         try
905         {
906             if (listener != null)
907             {
908                 LOG.debug("Invoking callback with {} on listener {}", goAwayResultInfo, listener);
909                 listener.onGoAway(this, goAwayResultInfo);
910             }
911         }
912         catch (Exception x)
913         {
914             LOG.info("Exception while notifying listener " + listener, x);
915         }
916         catch (Error x)
917         {
918             LOG.info("Exception while notifying listener " + listener, x);
919             throw x;
920         }
921     }
922 
923     @Override
924     public void control(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Callback callback)
925     {
926         generateAndEnqueueControlFrame(stream, frame, timeout, unit, callback);
927     }
928 
929     private void generateAndEnqueueControlFrame(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Callback callback)
930     {
931         try
932         {
933             // Synchronization is necessary, since we may have concurrent replies
934             // and those needs to be generated and enqueued atomically in order
935             // to maintain a correct compression context
936             synchronized (this)
937             {
938                 ByteBuffer buffer = generator.control(frame);
939                 LOG.debug("Queuing {} on {}", frame, stream);
940                 ControlFrameBytes frameBytes = new ControlFrameBytes(stream, callback, frame, buffer);
941                 if (timeout > 0)
942                     frameBytes.task = scheduler.schedule(frameBytes, timeout, unit);
943 
944                 // Special handling for PING frames, they must be sent as soon as possible
945                 if (ControlFrameType.PING == frame.getType())
946                     flusher.prepend(frameBytes);
947                 else
948                     flusher.append(frameBytes);
949             }
950         }
951         catch (Exception x)
952         {
953             notifyCallbackFailed(callback, x);
954         }
955     }
956 
957     private void updateLastStreamId(IStream stream)
958     {
959         int streamId = stream.getId();
960         if (streamId % 2 != streamIds.get() % 2)
961             Atomics.updateMax(lastStreamId, streamId);
962     }
963 
964     @Override
965     public void data(IStream stream, DataInfo dataInfo, long timeout, TimeUnit unit, Callback callback)
966     {
967         LOG.debug("Queuing {} on {}", dataInfo, stream);
968         DataFrameBytes frameBytes = new DataFrameBytes(stream, callback, dataInfo);
969         if (timeout > 0)
970             frameBytes.task = scheduler.schedule(frameBytes, timeout, unit);
971         flusher.append(frameBytes);
972     }
973 
974     @Override
975     public void shutdown()
976     {
977         FrameBytes frameBytes = new CloseFrameBytes();
978         flusher.append(frameBytes);
979     }
980 
981     private void complete(final Callback callback)
982     {
983         callback.succeeded();
984     }
985 
986     private void notifyCallbackFailed(Callback callback, Throwable x)
987     {
988         try
989         {
990             if (callback != null)
991                 callback.failed(x);
992         }
993         catch (Exception xx)
994         {
995             LOG.info("Exception while notifying callback " + callback, xx);
996         }
997         catch (Error xx)
998         {
999             LOG.info("Exception while notifying callback " + callback, xx);
1000             throw xx;
1001         }
1002     }
1003 
1004     public int getWindowSize()
1005     {
1006         return flowControlStrategy.getWindowSize(this);
1007     }
1008 
1009     public void setWindowSize(int initialWindowSize)
1010     {
1011         flowControlStrategy.setWindowSize(this, initialWindowSize);
1012     }
1013 
1014     @Override
1015     public String toString()
1016     {
1017         return String.format("%s@%x{v%d,queueSize=%d,windowSize=%d,streams=%d}", getClass().getSimpleName(),
1018                 hashCode(), version, flusher.getQueueSize(), getWindowSize(), streams.size());
1019     }
1020 
1021     @Override
1022     public String dump()
1023     {
1024         return ContainerLifeCycle.dump(this);
1025     }
1026 
1027     @Override
1028     public void dump(Appendable out, String indent) throws IOException
1029     {
1030         ContainerLifeCycle.dumpObject(out, this);
1031         ContainerLifeCycle.dump(out, indent, Collections.singletonList(controller), streams.values());
1032     }
1033 
1034     public interface FrameBytes extends Comparable<FrameBytes>, Callback
1035     {
1036         public IStream getStream();
1037 
1038         public abstract ByteBuffer getByteBuffer();
1039     }
1040 
1041     abstract class AbstractFrameBytes implements FrameBytes, Runnable
1042     {
1043         private final IStream stream;
1044         private final Callback callback;
1045         protected volatile Scheduler.Task task;
1046 
1047         protected AbstractFrameBytes(IStream stream, Callback callback)
1048         {
1049             this.stream = stream;
1050             this.callback = Objects.requireNonNull(callback);
1051         }
1052 
1053         @Override
1054         public IStream getStream()
1055         {
1056             return stream;
1057         }
1058 
1059         @Override
1060         public int compareTo(FrameBytes that)
1061         {
1062             // FrameBytes may have or not have a related stream (for example, PING do not have a related stream)
1063             // FrameBytes without related streams have higher priority
1064             IStream thisStream = getStream();
1065             IStream thatStream = that.getStream();
1066             if (thisStream == null)
1067                 return thatStream == null ? 0 : -1;
1068             if (thatStream == null)
1069                 return 1;
1070             // If this.stream.priority > that.stream.priority => this.stream has less priority than that.stream
1071             return thatStream.getPriority() - thisStream.getPriority();
1072         }
1073 
1074         private void cancelTask()
1075         {
1076             Scheduler.Task task = this.task;
1077             if (task != null)
1078                 task.cancel();
1079         }
1080 
1081         @Override
1082         public void run()
1083         {
1084             close();
1085             failed(new InterruptedByTimeoutException());
1086         }
1087 
1088         @Override
1089         public void succeeded()
1090         {
1091             cancelTask();
1092             StandardSession.this.complete(callback);
1093         }
1094 
1095         @Override
1096         public void failed(Throwable x)
1097         {
1098             cancelTask();
1099             notifyCallbackFailed(callback, x);
1100         }
1101     }
1102 
1103     class ControlFrameBytes extends AbstractFrameBytes
1104     {
1105         private final ControlFrame frame;
1106         private final ByteBuffer buffer;
1107 
1108         private ControlFrameBytes(IStream stream, Callback callback, ControlFrame frame, ByteBuffer buffer)
1109         {
1110             super(stream, callback);
1111             this.frame = frame;
1112             this.buffer = buffer;
1113         }
1114 
1115         @Override
1116         public ByteBuffer getByteBuffer()
1117         {
1118             return buffer;
1119         }
1120 
1121         @Override
1122         public void succeeded()
1123         {
1124             bufferPool.release(buffer);
1125 
1126             super.succeeded();
1127 
1128             if (frame.getType() == ControlFrameType.GO_AWAY)
1129             {
1130                 // After sending a GO_AWAY we need to hard close the connection.
1131                 // Recipients will know the last good stream id and act accordingly.
1132                 close();
1133             }
1134             IStream stream = getStream();
1135             if (stream != null && stream.isClosed())
1136                 removeStream(stream);
1137         }
1138 
1139         @Override
1140         public String toString()
1141         {
1142             return frame.toString();
1143         }
1144     }
1145 
1146     private class DataFrameBytes extends AbstractFrameBytes
1147     {
1148         private final DataInfo dataInfo;
1149         private int size;
1150         private volatile ByteBuffer buffer;
1151 
1152         private DataFrameBytes(IStream stream, Callback handler, DataInfo dataInfo)
1153         {
1154             super(stream, handler);
1155             this.dataInfo = dataInfo;
1156         }
1157 
1158         @Override
1159         public ByteBuffer getByteBuffer()
1160         {
1161             try
1162             {
1163                 IStream stream = getStream();
1164                 int windowSize = stream.getWindowSize();
1165                 if (windowSize <= 0)
1166                     return null;
1167 
1168                 size = dataInfo.available();
1169                 if (size > windowSize)
1170                     size = windowSize;
1171 
1172                 buffer = generator.data(stream.getId(), size, dataInfo);
1173                 return buffer;
1174             }
1175             catch (Throwable x)
1176             {
1177                 failed(x);
1178                 return null;
1179             }
1180         }
1181 
1182         @Override
1183         public void succeeded()
1184         {
1185             bufferPool.release(buffer);
1186             IStream stream = getStream();
1187             dataInfo.consume(size);
1188             flowControlStrategy.updateWindow(StandardSession.this, stream, -size);
1189             if (dataInfo.available() > 0)
1190             {
1191                 // We have written a frame out of this DataInfo, but there is more to write.
1192                 // We need to keep the correct ordering of frames, to avoid that another
1193                 // DataInfo for the same stream is written before this one is finished.
1194                 flusher.prepend(this);
1195             }
1196             else
1197             {
1198                 super.succeeded();
1199                 stream.updateCloseState(dataInfo.isClose(), true);
1200                 if (stream.isClosed())
1201                     removeStream(stream);
1202             }
1203         }
1204 
1205         @Override
1206         public String toString()
1207         {
1208             return String.format("DATA bytes @%x available=%d consumed=%d on %s", dataInfo.hashCode(), dataInfo.available(), dataInfo.consumed(), getStream());
1209         }
1210     }
1211 
1212     private class CloseFrameBytes extends AbstractFrameBytes
1213     {
1214         private CloseFrameBytes()
1215         {
1216             super(null, new Callback.Adapter());
1217         }
1218 
1219         @Override
1220         public ByteBuffer getByteBuffer()
1221         {
1222             return BufferUtil.EMPTY_BUFFER;
1223         }
1224 
1225         @Override
1226         public void succeeded()
1227         {
1228             super.succeeded();
1229             close();
1230         }
1231     }
1232 
1233     private static class PingInfoCallback extends PingResultInfo implements Callback
1234     {
1235         private final Promise<PingResultInfo> promise;
1236 
1237         public PingInfoCallback(int pingId, Promise<PingResultInfo> promise)
1238         {
1239             super(pingId);
1240             this.promise = promise;
1241         }
1242 
1243         @Override
1244         public void succeeded()
1245         {
1246             if (promise != null)
1247                 promise.succeeded(this);
1248         }
1249 
1250         @Override
1251         public void failed(Throwable x)
1252         {
1253             if (promise != null)
1254                 promise.failed(x);
1255         }
1256     }
1257 }