View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.spdy;
20  
21  import java.io.IOException;
22  import java.net.InetSocketAddress;
23  import java.nio.ByteBuffer;
24  import java.nio.channels.InterruptedByTimeoutException;
25  import java.util.Collections;
26  import java.util.HashSet;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Objects;
30  import java.util.Set;
31  import java.util.concurrent.ConcurrentHashMap;
32  import java.util.concurrent.ConcurrentMap;
33  import java.util.concurrent.CopyOnWriteArrayList;
34  import java.util.concurrent.ExecutionException;
35  import java.util.concurrent.TimeUnit;
36  import java.util.concurrent.TimeoutException;
37  import java.util.concurrent.atomic.AtomicBoolean;
38  import java.util.concurrent.atomic.AtomicInteger;
39  
40  import org.eclipse.jetty.io.ByteBufferPool;
41  import org.eclipse.jetty.io.EndPoint;
42  import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
43  import org.eclipse.jetty.spdy.api.DataInfo;
44  import org.eclipse.jetty.spdy.api.GoAwayInfo;
45  import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
46  import org.eclipse.jetty.spdy.api.PingInfo;
47  import org.eclipse.jetty.spdy.api.PingResultInfo;
48  import org.eclipse.jetty.spdy.api.PushInfo;
49  import org.eclipse.jetty.spdy.api.RstInfo;
50  import org.eclipse.jetty.spdy.api.SPDYException;
51  import org.eclipse.jetty.spdy.api.Session;
52  import org.eclipse.jetty.spdy.api.SessionFrameListener;
53  import org.eclipse.jetty.spdy.api.SessionStatus;
54  import org.eclipse.jetty.spdy.api.Settings;
55  import org.eclipse.jetty.spdy.api.SettingsInfo;
56  import org.eclipse.jetty.spdy.api.Stream;
57  import org.eclipse.jetty.spdy.api.StreamFrameListener;
58  import org.eclipse.jetty.spdy.api.StreamStatus;
59  import org.eclipse.jetty.spdy.api.SynInfo;
60  import org.eclipse.jetty.spdy.frames.ControlFrame;
61  import org.eclipse.jetty.spdy.frames.ControlFrameType;
62  import org.eclipse.jetty.spdy.frames.CredentialFrame;
63  import org.eclipse.jetty.spdy.frames.DataFrame;
64  import org.eclipse.jetty.spdy.frames.GoAwayFrame;
65  import org.eclipse.jetty.spdy.frames.HeadersFrame;
66  import org.eclipse.jetty.spdy.frames.PingFrame;
67  import org.eclipse.jetty.spdy.frames.RstStreamFrame;
68  import org.eclipse.jetty.spdy.frames.SettingsFrame;
69  import org.eclipse.jetty.spdy.frames.SynReplyFrame;
70  import org.eclipse.jetty.spdy.frames.SynStreamFrame;
71  import org.eclipse.jetty.spdy.frames.WindowUpdateFrame;
72  import org.eclipse.jetty.spdy.generator.Generator;
73  import org.eclipse.jetty.spdy.parser.Parser;
74  import org.eclipse.jetty.util.Atomics;
75  import org.eclipse.jetty.util.BufferUtil;
76  import org.eclipse.jetty.util.Callback;
77  import org.eclipse.jetty.util.FutureCallback;
78  import org.eclipse.jetty.util.FuturePromise;
79  import org.eclipse.jetty.util.Promise;
80  import org.eclipse.jetty.util.component.ContainerLifeCycle;
81  import org.eclipse.jetty.util.component.Dumpable;
82  import org.eclipse.jetty.util.log.Log;
83  import org.eclipse.jetty.util.log.Logger;
84  import org.eclipse.jetty.util.thread.Scheduler;
85  
86  public class StandardSession implements ISession, Parser.Listener, Dumpable
87  {
88      private static final Logger LOG = Log.getLogger(Session.class);
89  
90      private final Flusher flusher;
91      private final Map<String, Object> attributes = new ConcurrentHashMap<>();
92      private final List<Listener> listeners = new CopyOnWriteArrayList<>();
93      private final ConcurrentMap<Integer, IStream> streams = new ConcurrentHashMap<>();
94      private final ByteBufferPool bufferPool;
95      private final Scheduler scheduler;
96      private final short version;
97      private final Controller controller;
98      private final EndPoint endPoint;
99      private final IdleListener idleListener;
100     private final AtomicInteger streamIds;
101     private final AtomicInteger pingIds;
102     private final SessionFrameListener listener;
103     private final Generator generator;
104     private final AtomicBoolean goAwaySent = new AtomicBoolean();
105     private final AtomicBoolean goAwayReceived = new AtomicBoolean();
106     private final AtomicInteger lastStreamId = new AtomicInteger();
107     private final AtomicInteger localStreamCount = new AtomicInteger(0);
108     private final FlowControlStrategy flowControlStrategy;
109     private volatile int maxConcurrentLocalStreams = -1;
110 
111     public StandardSession(short version, ByteBufferPool bufferPool, Scheduler scheduler,
112                            Controller controller, EndPoint endPoint, IdleListener idleListener, int initialStreamId,
113                            SessionFrameListener listener, Generator generator, FlowControlStrategy flowControlStrategy)
114     {
115         // TODO this should probably be an aggregate lifecycle
116         this.version = version;
117         this.bufferPool = bufferPool;
118         this.scheduler = scheduler;
119         this.controller = controller;
120         this.endPoint = endPoint;
121         this.idleListener = idleListener;
122         this.streamIds = new AtomicInteger(initialStreamId);
123         this.pingIds = new AtomicInteger(initialStreamId);
124         this.listener = listener;
125         this.generator = generator;
126         this.flowControlStrategy = flowControlStrategy;
127         this.flusher = new Flusher(controller);
128     }
129 
130     @Override
131     public short getVersion()
132     {
133         return version;
134     }
135 
136     @Override
137     public void addListener(Listener listener)
138     {
139         listeners.add(listener);
140     }
141 
142     @Override
143     public void removeListener(Listener listener)
144     {
145         listeners.remove(listener);
146     }
147 
148     @Override
149     public Stream syn(SynInfo synInfo, StreamFrameListener listener) throws ExecutionException, InterruptedException, TimeoutException
150     {
151         FuturePromise<Stream> result = new FuturePromise<>();
152         syn(synInfo, listener, result);
153         if (synInfo.getTimeout() > 0)
154             return result.get(synInfo.getTimeout(), synInfo.getUnit());
155         else
156             return result.get();
157     }
158 
159     @Override
160     public void syn(SynInfo synInfo, StreamFrameListener listener, Promise<Stream> promise)
161     {
162         // Synchronization is necessary.
163         // SPEC v3, 2.3.1 requires that the stream creation be monotonically crescent
164         // so we cannot allow thread1 to create stream1 and thread2 create stream3 and
165         // have stream3 hit the network before stream1, not only to comply with the spec
166         // but also because the compression context for the headers would be wrong, as the
167         // frame with a compression history will come before the first compressed frame.
168         int associatedStreamId = 0;
169         if (synInfo instanceof PushSynInfo)
170             associatedStreamId = ((PushSynInfo)synInfo).getAssociatedStreamId();
171 
172         synchronized (this)
173         {
174             int streamId = streamIds.getAndAdd(2);
175             // TODO: for SPDYv3 we need to support the "slot" argument
176             SynStreamFrame synStream = new SynStreamFrame(version, synInfo.getFlags(), streamId, associatedStreamId, synInfo.getPriority(), (short)0, synInfo.getHeaders());
177             IStream stream = createStream(synStream, listener, true, promise);
178             if (stream == null)
179                 return;
180             generateAndEnqueueControlFrame(stream, synStream, synInfo.getTimeout(), synInfo.getUnit(), stream);
181         }
182     }
183 
184     @Override
185     public void rst(RstInfo rstInfo) throws InterruptedException, ExecutionException, TimeoutException
186     {
187         FutureCallback result = new FutureCallback();
188         rst(rstInfo, result);
189         if (rstInfo.getTimeout() > 0)
190             result.get(rstInfo.getTimeout(), rstInfo.getUnit());
191         else
192             result.get();
193     }
194 
195     @Override
196     public void rst(RstInfo rstInfo, Callback callback)
197     {
198         // SPEC v3, 2.2.2
199         if (goAwaySent.get())
200         {
201             complete(callback);
202         }
203         else
204         {
205             int streamId = rstInfo.getStreamId();
206             IStream stream = streams.get(streamId);
207             RstStreamFrame frame = new RstStreamFrame(version, streamId, rstInfo.getStreamStatus().getCode(version));
208             control(stream, frame, rstInfo.getTimeout(), rstInfo.getUnit(), callback);
209             if (stream != null)
210             {
211                 stream.process(frame);
212                 flusher.removeFrameBytesFromQueue(stream);
213                 removeStream(stream);
214             }
215         }
216     }
217 
218     @Override
219     public void settings(SettingsInfo settingsInfo) throws ExecutionException, InterruptedException, TimeoutException
220     {
221         FutureCallback result = new FutureCallback();
222         settings(settingsInfo, result);
223         if (settingsInfo.getTimeout() > 0)
224             result.get(settingsInfo.getTimeout(), settingsInfo.getUnit());
225         else
226             result.get();
227     }
228 
229     @Override
230     public void settings(SettingsInfo settingsInfo, Callback callback)
231     {
232         SettingsFrame frame = new SettingsFrame(version, settingsInfo.getFlags(), settingsInfo.getSettings());
233         control(null, frame, settingsInfo.getTimeout(), settingsInfo.getUnit(), callback);
234     }
235 
236     @Override
237     public PingResultInfo ping(PingInfo pingInfo) throws ExecutionException, InterruptedException, TimeoutException
238     {
239         FuturePromise<PingResultInfo> result = new FuturePromise<>();
240         ping(pingInfo, result);
241         if (pingInfo.getTimeout() > 0)
242             return result.get(pingInfo.getTimeout(), pingInfo.getUnit());
243         else
244             return result.get();
245     }
246 
247     @Override
248     public void ping(PingInfo pingInfo, Promise<PingResultInfo> promise)
249     {
250         int pingId = pingIds.getAndAdd(2);
251         PingInfoCallback pingInfoCallback = new PingInfoCallback(pingId, promise);
252         PingFrame frame = new PingFrame(version, pingId);
253         control(null, frame, pingInfo.getTimeout(), pingInfo.getUnit(), pingInfoCallback);
254     }
255 
256     @Override
257     public void goAway(GoAwayInfo goAwayInfo) throws ExecutionException, InterruptedException, TimeoutException
258     {
259         goAway(goAwayInfo, SessionStatus.OK);
260     }
261 
262     private void goAway(GoAwayInfo goAwayInfo, SessionStatus sessionStatus) throws ExecutionException, InterruptedException, TimeoutException
263     {
264         FutureCallback result = new FutureCallback();
265         goAway(sessionStatus, goAwayInfo.getTimeout(), goAwayInfo.getUnit(), result);
266         if (goAwayInfo.getTimeout() > 0)
267             result.get(goAwayInfo.getTimeout(), goAwayInfo.getUnit());
268         else
269             result.get();
270     }
271 
272     @Override
273     public void goAway(GoAwayInfo goAwayInfo, Callback callback)
274     {
275         goAway(SessionStatus.OK, goAwayInfo.getTimeout(), goAwayInfo.getUnit(), callback);
276     }
277 
278     private void goAway(SessionStatus sessionStatus, long timeout, TimeUnit unit, Callback callback)
279     {
280         if (goAwaySent.compareAndSet(false, true))
281         {
282             if (!goAwayReceived.get())
283             {
284                 GoAwayFrame frame = new GoAwayFrame(version, lastStreamId.get(), sessionStatus.getCode());
285                 control(null, frame, timeout, unit, callback);
286                 return;
287             }
288         }
289         complete(callback);
290     }
291 
292     @Override
293     public Set<Stream> getStreams()
294     {
295         Set<Stream> result = new HashSet<>();
296         result.addAll(streams.values());
297         return result;
298     }
299 
300     @Override
301     public IStream getStream(int streamId)
302     {
303         return streams.get(streamId);
304     }
305 
306     @Override
307     public Object getAttribute(String key)
308     {
309         return attributes.get(key);
310     }
311 
312     @Override
313     public void setAttribute(String key, Object value)
314     {
315         attributes.put(key, value);
316     }
317 
318     @Override
319     public Object removeAttribute(String key)
320     {
321         return attributes.remove(key);
322     }
323 
324     @Override
325     public InetSocketAddress getLocalAddress()
326     {
327         return endPoint.getLocalAddress();
328     }
329 
330     @Override
331     public InetSocketAddress getRemoteAddress()
332     {
333         return endPoint.getRemoteAddress();
334     }
335 
336     @Override
337     public void onControlFrame(ControlFrame frame)
338     {
339         notifyIdle(idleListener, false);
340         try
341         {
342             if (LOG.isDebugEnabled())
343                 LOG.debug("Processing {}", frame);
344 
345             if (goAwaySent.get())
346             {
347                 if (LOG.isDebugEnabled())
348                     LOG.debug("Skipped processing of {}", frame);
349                 return;
350             }
351 
352             switch (frame.getType())
353             {
354                 case SYN_STREAM:
355                 {
356                     onSyn((SynStreamFrame)frame);
357                     break;
358                 }
359                 case SYN_REPLY:
360                 {
361                     onReply((SynReplyFrame)frame);
362                     break;
363                 }
364                 case RST_STREAM:
365                 {
366                     onRst((RstStreamFrame)frame);
367                     break;
368                 }
369                 case SETTINGS:
370                 {
371                     onSettings((SettingsFrame)frame);
372                     break;
373                 }
374                 case NOOP:
375                 {
376                     // Just ignore it
377                     break;
378                 }
379                 case PING:
380                 {
381                     onPing((PingFrame)frame);
382                     break;
383                 }
384                 case GO_AWAY:
385                 {
386                     onGoAway((GoAwayFrame)frame);
387                     break;
388                 }
389                 case HEADERS:
390                 {
391                     onHeaders((HeadersFrame)frame);
392                     break;
393                 }
394                 case WINDOW_UPDATE:
395                 {
396                     onWindowUpdate((WindowUpdateFrame)frame);
397                     break;
398                 }
399                 case CREDENTIAL:
400                 {
401                     onCredential((CredentialFrame)frame);
402                     break;
403                 }
404                 default:
405                 {
406                     throw new IllegalStateException();
407                 }
408             }
409         }
410         finally
411         {
412             notifyIdle(idleListener, true);
413         }
414     }
415 
416     @Override
417     public void onDataFrame(DataFrame frame, ByteBuffer data)
418     {
419         notifyIdle(idleListener, false);
420         try
421         {
422             if (LOG.isDebugEnabled())
423                 LOG.debug("Processing {}, {} data bytes", frame, data.remaining());
424 
425             if (goAwaySent.get())
426             {
427                 if (LOG.isDebugEnabled())
428                     LOG.debug("Skipped processing of {}", frame);
429                 return;
430             }
431 
432             int streamId = frame.getStreamId();
433             IStream stream = streams.get(streamId);
434             if (stream == null)
435             {
436                 RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM);
437                 if (LOG.isDebugEnabled())
438                     LOG.debug("Unknown stream {}", rstInfo);
439                 rst(rstInfo, Callback.Adapter.INSTANCE);
440             }
441             else
442             {
443                 processData(stream, frame, data);
444             }
445         }
446         finally
447         {
448             notifyIdle(idleListener, true);
449         }
450     }
451 
452     private void notifyIdle(IdleListener listener, boolean idle)
453     {
454         if (listener != null)
455             listener.onIdle(idle);
456     }
457 
458     private void processData(final IStream stream, DataFrame frame, ByteBuffer data)
459     {
460         ByteBufferDataInfo dataInfo = new ByteBufferDataInfo(data, frame.isClose())
461         {
462             @Override
463             public void consume(int delta)
464             {
465                 super.consume(delta);
466                 flowControlStrategy.onDataConsumed(StandardSession.this, stream, this, delta);
467             }
468         };
469         flowControlStrategy.onDataReceived(this, stream, dataInfo);
470         stream.process(dataInfo);
471         if (stream.isClosed())
472             removeStream(stream);
473     }
474 
475     @Override
476     public void onStreamException(StreamException x)
477     {
478         notifyOnFailure(listener, x); // TODO: notify StreamFrameListener if exists?
479         rst(new RstInfo(x.getStreamId(), x.getStreamStatus()), Callback.Adapter.INSTANCE);
480     }
481 
482     @Override
483     public void onSessionException(SessionException x)
484     {
485         Throwable cause = x.getCause();
486         notifyOnFailure(listener, cause == null ? x : cause);
487         goAway(x.getSessionStatus(), 0, TimeUnit.SECONDS, Callback.Adapter.INSTANCE);
488     }
489 
490     private void onSyn(final SynStreamFrame frame)
491     {
492         IStream stream = createStream(frame, null, false, new Promise.Adapter<Stream>()
493         {
494             @Override
495             public void failed(Throwable x)
496             {
497                 LOG.debug("Received: {} but creating new Stream failed: {}", frame, x.getMessage());
498             }
499         });
500         if (stream != null)
501             processSyn(listener, stream, frame);
502     }
503 
504     private void processSyn(SessionFrameListener listener, IStream stream, SynStreamFrame frame)
505     {
506         stream.process(frame);
507         // Update the last stream id before calling the application (which may send a GO_AWAY)
508         updateLastStreamId(stream);
509         StreamFrameListener streamListener;
510         if (stream.isUnidirectional())
511         {
512             PushInfo pushInfo = new PushInfo(frame.getHeaders(), frame.isClose());
513             streamListener = notifyOnPush(stream.getAssociatedStream().getStreamFrameListener(), stream, pushInfo);
514         }
515         else
516         {
517             SynInfo synInfo = new SynInfo(frame.getHeaders(), frame.isClose(), frame.getPriority());
518             streamListener = notifyOnSyn(listener, stream, synInfo);
519         }
520         stream.setStreamFrameListener(streamListener);
521         // The onSyn() listener may have sent a frame that closed the stream
522         if (stream.isClosed())
523             removeStream(stream);
524     }
525 
526     private IStream createStream(SynStreamFrame frame, StreamFrameListener listener, boolean local, Promise<Stream> promise)
527     {
528         IStream associatedStream = streams.get(frame.getAssociatedStreamId());
529         IStream stream = new StandardStream(frame.getStreamId(), frame.getPriority(), this, associatedStream,
530                 scheduler, promise);
531         stream.setIdleTimeout(endPoint.getIdleTimeout());
532         flowControlStrategy.onNewStream(this, stream);
533 
534         stream.updateCloseState(frame.isClose(), local);
535         stream.setStreamFrameListener(listener);
536 
537         if (stream.isUnidirectional())
538         {
539             // Unidirectional streams are implicitly half closed
540             stream.updateCloseState(true, !local);
541             if (!stream.isClosed())
542                 stream.getAssociatedStream().associate(stream);
543         }
544 
545         int streamId = stream.getId();
546 
547         if (local)
548         {
549             while (true)
550             {
551                 int oldStreamCountValue = localStreamCount.get();
552                 int maxConcurrentStreams = maxConcurrentLocalStreams;
553                 if (maxConcurrentStreams > -1 && oldStreamCountValue >= maxConcurrentStreams)
554                 {
555                     String message = String.format("Max concurrent local streams (%d) exceeded.",
556                             maxConcurrentStreams);
557                     LOG.debug(message);
558                     promise.failed(new SPDYException(message));
559                     return null;
560                 }
561                 if (localStreamCount.compareAndSet(oldStreamCountValue, oldStreamCountValue + 1))
562                     break;
563             }
564         }
565 
566         if (streams.putIfAbsent(streamId, stream) != null)
567         {
568             String message = "Duplicate stream id " + streamId;
569             IllegalStateException duplicateIdException = new IllegalStateException(message);
570             promise.failed(duplicateIdException);
571             if (local)
572             {
573                 localStreamCount.decrementAndGet();
574                 throw duplicateIdException;
575             }
576             RstInfo rstInfo = new RstInfo(streamId, StreamStatus.PROTOCOL_ERROR);
577             if (LOG.isDebugEnabled())
578                 LOG.debug("Duplicate stream, {}", rstInfo);
579             rst(rstInfo, Callback.Adapter.INSTANCE); // We don't care (too much) if the reset fails.
580             return null;
581         }
582         else
583         {
584             if (LOG.isDebugEnabled())
585                 LOG.debug("Created {}", stream);
586             notifyStreamCreated(stream);
587             return stream;
588         }
589     }
590 
591     private void notifyStreamCreated(IStream stream)
592     {
593         for (Listener listener : listeners)
594         {
595             if (listener instanceof StreamListener)
596             {
597                 try
598                 {
599                     ((StreamListener)listener).onStreamCreated(stream);
600                 }
601                 catch (Exception x)
602                 {
603                     LOG.info("Exception while notifying listener " + listener, x);
604                 }
605                 catch (Error x)
606                 {
607                     LOG.info("Exception while notifying listener " + listener, x);
608                     throw x;
609                 }
610             }
611         }
612     }
613 
614     private void removeStream(IStream stream)
615     {
616         if (stream.isUnidirectional())
617             stream.getAssociatedStream().disassociate(stream);
618 
619         IStream removed = streams.remove(stream.getId());
620         if (removed != null)
621         {
622             assert removed == stream;
623 
624             if (streamIds.get() % 2 == stream.getId() % 2)
625                 localStreamCount.decrementAndGet();
626 
627             if (LOG.isDebugEnabled())
628                 LOG.debug("Removed {}", stream);
629             notifyStreamClosed(stream);
630         }
631     }
632 
633     private void notifyStreamClosed(IStream stream)
634     {
635         for (Listener listener : listeners)
636         {
637             if (listener instanceof StreamListener)
638             {
639                 try
640                 {
641                     ((StreamListener)listener).onStreamClosed(stream);
642                 }
643                 catch (Exception x)
644                 {
645                     LOG.info("Exception while notifying listener " + listener, x);
646                 }
647                 catch (Error x)
648                 {
649                     LOG.info("Exception while notifying listener " + listener, x);
650                     throw x;
651                 }
652             }
653         }
654     }
655 
656     private void onReply(SynReplyFrame frame)
657     {
658         int streamId = frame.getStreamId();
659         IStream stream = streams.get(streamId);
660         if (stream == null)
661         {
662             RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM);
663             if (LOG.isDebugEnabled())
664                 LOG.debug("Unknown stream {}", rstInfo);
665             rst(rstInfo, Callback.Adapter.INSTANCE);
666         }
667         else
668         {
669             processReply(stream, frame);
670         }
671     }
672 
673     private void processReply(IStream stream, SynReplyFrame frame)
674     {
675         stream.process(frame);
676         if (stream.isClosed())
677             removeStream(stream);
678     }
679 
680     private void onRst(RstStreamFrame frame)
681     {
682         IStream stream = streams.get(frame.getStreamId());
683 
684         if (stream != null)
685             stream.process(frame);
686 
687         RstInfo rstInfo = new RstInfo(frame.getStreamId(), StreamStatus.from(frame.getVersion(), frame.getStatusCode()));
688         notifyOnRst(listener, rstInfo);
689 
690         if (stream != null)
691             removeStream(stream);
692     }
693 
694     private void onSettings(SettingsFrame frame)
695     {
696         Settings.Setting windowSizeSetting = frame.getSettings().get(Settings.ID.INITIAL_WINDOW_SIZE);
697         if (windowSizeSetting != null)
698         {
699             int windowSize = windowSizeSetting.value();
700             setWindowSize(windowSize);
701             if (LOG.isDebugEnabled())
702                 LOG.debug("Updated session window size to {}", windowSize);
703         }
704         Settings.Setting maxConcurrentStreamsSetting = frame.getSettings().get(Settings.ID.MAX_CONCURRENT_STREAMS);
705         if (maxConcurrentStreamsSetting != null)
706         {
707             int maxConcurrentStreamsValue = maxConcurrentStreamsSetting.value();
708             maxConcurrentLocalStreams = maxConcurrentStreamsValue;
709             if (LOG.isDebugEnabled())
710                 LOG.debug("Updated session maxConcurrentLocalStreams to {}", maxConcurrentStreamsValue);
711         }
712         SettingsInfo settingsInfo = new SettingsInfo(frame.getSettings(), frame.isClearPersisted());
713         notifyOnSettings(listener, settingsInfo);
714     }
715 
716     private void onPing(PingFrame frame)
717     {
718         int pingId = frame.getPingId();
719         if (pingId % 2 == pingIds.get() % 2)
720         {
721             PingResultInfo pingResultInfo = new PingResultInfo(frame.getPingId());
722             notifyOnPing(listener, pingResultInfo);
723         }
724         else
725         {
726             control(null, frame, 0, TimeUnit.MILLISECONDS, Callback.Adapter.INSTANCE);
727         }
728     }
729 
730     private void onGoAway(GoAwayFrame frame)
731     {
732         if (goAwayReceived.compareAndSet(false, true))
733         {
734             GoAwayResultInfo goAwayResultInfo = new GoAwayResultInfo(frame.getLastStreamId(), SessionStatus.from(frame.getStatusCode()));
735             notifyOnGoAway(listener, goAwayResultInfo);
736             // SPDY does not require to send back a response to a GO_AWAY.
737             // We notified the application of the last good stream id and
738             // tried our best to flush remaining data.
739         }
740     }
741 
742     private void onHeaders(HeadersFrame frame)
743     {
744         int streamId = frame.getStreamId();
745         IStream stream = streams.get(streamId);
746         if (stream == null)
747         {
748             RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM);
749             if (LOG.isDebugEnabled())
750                 LOG.debug("Unknown stream, {}", rstInfo);
751             rst(rstInfo, Callback.Adapter.INSTANCE);
752         }
753         else
754         {
755             processHeaders(stream, frame);
756         }
757     }
758 
759     private void processHeaders(IStream stream, HeadersFrame frame)
760     {
761         stream.process(frame);
762         if (stream.isClosed())
763             removeStream(stream);
764     }
765 
766     private void onWindowUpdate(WindowUpdateFrame frame)
767     {
768         int streamId = frame.getStreamId();
769         IStream stream = streams.get(streamId);
770         flowControlStrategy.onWindowUpdate(this, stream, frame.getWindowDelta());
771         flusher.flush();
772     }
773 
774     private void onCredential(CredentialFrame frame)
775     {
776         LOG.warn("{} frame not yet supported", frame.getType());
777     }
778 
779     protected void close()
780     {
781         // Check for null to support tests
782         if (controller != null)
783             controller.close(false);
784     }
785 
786     private void notifyOnFailure(SessionFrameListener listener, Throwable x)
787     {
788         try
789         {
790             if (listener != null)
791             {
792                 if (LOG.isDebugEnabled())
793                     LOG.debug("Invoking callback with {} on listener {}", x, listener);
794                 listener.onFailure(this, x);
795             }
796         }
797         catch (Exception xx)
798         {
799             LOG.info("Exception while notifying listener " + listener, xx);
800         }
801         catch (Error xx)
802         {
803             LOG.info("Exception while notifying listener " + listener, xx);
804             throw xx;
805         }
806     }
807 
808     private StreamFrameListener notifyOnPush(StreamFrameListener listener, Stream stream, PushInfo pushInfo)
809     {
810         try
811         {
812             if (listener == null)
813                 return null;
814             if (LOG.isDebugEnabled())
815                 LOG.debug("Invoking callback with {} on listener {}", pushInfo, listener);
816             return listener.onPush(stream, pushInfo);
817         }
818         catch (Exception x)
819         {
820             LOG.info("Exception while notifying listener " + listener, x);
821             return null;
822         }
823         catch (Error x)
824         {
825             LOG.info("Exception while notifying listener " + listener, x);
826             throw x;
827         }
828     }
829 
830     private StreamFrameListener notifyOnSyn(SessionFrameListener listener, Stream stream, SynInfo synInfo)
831     {
832         try
833         {
834             if (listener == null)
835                 return null;
836             if (LOG.isDebugEnabled())
837                 LOG.debug("Invoking callback with {} on listener {}", synInfo, listener);
838             return listener.onSyn(stream, synInfo);
839         }
840         catch (Exception x)
841         {
842             LOG.info("Exception while notifying listener " + listener, x);
843             return null;
844         }
845         catch (Error x)
846         {
847             LOG.info("Exception while notifying listener " + listener, x);
848             throw x;
849         }
850     }
851 
852     private void notifyOnRst(SessionFrameListener listener, RstInfo rstInfo)
853     {
854         try
855         {
856             if (listener != null)
857             {
858                 if (LOG.isDebugEnabled())
859                     LOG.debug("Invoking callback with {} on listener {}", rstInfo, listener);
860                 listener.onRst(this, rstInfo);
861             }
862         }
863         catch (Exception x)
864         {
865             LOG.info("Exception while notifying listener " + listener, x);
866         }
867         catch (Error x)
868         {
869             LOG.info("Exception while notifying listener " + listener, x);
870             throw x;
871         }
872     }
873 
874     private void notifyOnSettings(SessionFrameListener listener, SettingsInfo settingsInfo)
875     {
876         try
877         {
878             if (listener != null)
879             {
880                 if (LOG.isDebugEnabled())
881                     LOG.debug("Invoking callback with {} on listener {}", settingsInfo, listener);
882                 listener.onSettings(this, settingsInfo);
883             }
884         }
885         catch (Exception x)
886         {
887             LOG.info("Exception while notifying listener " + listener, x);
888         }
889         catch (Error x)
890         {
891             LOG.info("Exception while notifying listener " + listener, x);
892             throw x;
893         }
894     }
895 
896     private void notifyOnPing(SessionFrameListener listener, PingResultInfo pingResultInfo)
897     {
898         try
899         {
900             if (listener != null)
901             {
902                 if (LOG.isDebugEnabled())
903                     LOG.debug("Invoking callback with {} on listener {}", pingResultInfo, listener);
904                 listener.onPing(this, pingResultInfo);
905             }
906         }
907         catch (Exception x)
908         {
909             LOG.info("Exception while notifying listener " + listener, x);
910         }
911         catch (Error x)
912         {
913             LOG.info("Exception while notifying listener " + listener, x);
914             throw x;
915         }
916     }
917 
918     private void notifyOnGoAway(SessionFrameListener listener, GoAwayResultInfo goAwayResultInfo)
919     {
920         try
921         {
922             if (listener != null)
923             {
924                 if (LOG.isDebugEnabled())
925                     LOG.debug("Invoking callback with {} on listener {}", goAwayResultInfo, listener);
926                 listener.onGoAway(this, goAwayResultInfo);
927             }
928         }
929         catch (Exception x)
930         {
931             LOG.info("Exception while notifying listener " + listener, x);
932         }
933         catch (Error x)
934         {
935             LOG.info("Exception while notifying listener " + listener, x);
936             throw x;
937         }
938     }
939 
940     @Override
941     public void control(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Callback callback)
942     {
943         generateAndEnqueueControlFrame(stream, frame, timeout, unit, callback);
944     }
945 
946     private void generateAndEnqueueControlFrame(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Callback callback)
947     {
948         try
949         {
950             // Synchronization is necessary, since we may have concurrent replies
951             // and those needs to be generated and enqueued atomically in order
952             // to maintain a correct compression context
953             ControlFrameBytes frameBytes;
954             Throwable throwable;
955             synchronized (this)
956             {
957                 ByteBuffer buffer = generator.control(frame);
958                 if (LOG.isDebugEnabled())
959                     LOG.debug("Queuing {} on {}", frame, stream);
960                 frameBytes = new ControlFrameBytes(stream, callback, frame, buffer);
961                 if (timeout > 0)
962                     frameBytes.task = scheduler.schedule(frameBytes, timeout, unit);
963 
964                 // Special handling for PING frames, they must be sent as soon as possible
965                 if (ControlFrameType.PING == frame.getType())
966                     throwable = flusher.prepend(frameBytes);
967                 else
968                     throwable = flusher.append(frameBytes);
969             }
970             // Flush MUST be done outside synchronized blocks
971             flush(frameBytes, throwable);
972         }
973         catch (Exception x)
974         {
975             notifyCallbackFailed(callback, x);
976         }
977     }
978 
979     private void updateLastStreamId(IStream stream)
980     {
981         int streamId = stream.getId();
982         if (streamId % 2 != streamIds.get() % 2)
983             Atomics.updateMax(lastStreamId, streamId);
984     }
985 
986     @Override
987     public void data(IStream stream, DataInfo dataInfo, long timeout, TimeUnit unit, Callback callback)
988     {
989         if (LOG.isDebugEnabled())
990             LOG.debug("Queuing {} on {}", dataInfo, stream);
991         DataFrameBytes frameBytes = new DataFrameBytes(stream, callback, dataInfo);
992         if (timeout > 0)
993             frameBytes.task = scheduler.schedule(frameBytes, timeout, unit);
994         flush(frameBytes, flusher.append(frameBytes));
995     }
996 
997     @Override
998     public void shutdown()
999     {
1000         CloseFrameBytes frameBytes = new CloseFrameBytes();
1001         flush(frameBytes, flusher.append(frameBytes));
1002     }
1003 
1004     private void flush(FrameBytes frameBytes, Throwable throwable)
1005     {
1006         if (throwable != null)
1007             frameBytes.failed(throwable);
1008         else
1009             flusher.flush();
1010     }
1011 
1012     private void complete(final Callback callback)
1013     {
1014         try
1015         {
1016             if (callback != null)
1017                 callback.succeeded();
1018         }
1019         catch (Throwable x)
1020         {
1021             LOG.info("Exception while notifying callback " + callback, x);
1022         }
1023     }
1024 
1025     private void notifyCallbackFailed(Callback callback, Throwable failure)
1026     {
1027         try
1028         {
1029             if (callback != null)
1030                 callback.failed(failure);
1031         }
1032         catch (Throwable x)
1033         {
1034             LOG.info("Exception while notifying callback " + callback, x);
1035         }
1036     }
1037 
1038     public int getWindowSize()
1039     {
1040         return flowControlStrategy.getWindowSize(this);
1041     }
1042 
1043     public void setWindowSize(int initialWindowSize)
1044     {
1045         flowControlStrategy.setWindowSize(this, initialWindowSize);
1046     }
1047 
1048     @Override
1049     public String toString()
1050     {
1051         return String.format("%s@%x{v%d,queueSize=%d,windowSize=%d,streams=%d}", getClass().getSimpleName(),
1052                 hashCode(), version, flusher.getQueueSize(), getWindowSize(), streams.size());
1053     }
1054 
1055     @Override
1056     public String dump()
1057     {
1058         return ContainerLifeCycle.dump(this);
1059     }
1060 
1061     @Override
1062     public void dump(Appendable out, String indent) throws IOException
1063     {
1064         ContainerLifeCycle.dumpObject(out, this);
1065         ContainerLifeCycle.dump(out, indent, Collections.singletonList(controller), streams.values());
1066     }
1067 
1068     public interface FrameBytes extends Comparable<FrameBytes>, Callback
1069     {
1070         public IStream getStream();
1071 
1072         public abstract ByteBuffer getByteBuffer();
1073     }
1074 
1075     abstract class AbstractFrameBytes implements FrameBytes, Runnable
1076     {
1077         private final IStream stream;
1078         private final Callback callback;
1079         protected volatile Scheduler.Task task;
1080 
1081         protected AbstractFrameBytes(IStream stream, Callback callback)
1082         {
1083             this.stream = stream;
1084             this.callback = Objects.requireNonNull(callback);
1085         }
1086 
1087         @Override
1088         public IStream getStream()
1089         {
1090             return stream;
1091         }
1092 
1093         @Override
1094         public int compareTo(FrameBytes that)
1095         {
1096             // FrameBytes may have or not have a related stream (for example, PING do not have a related stream)
1097             // FrameBytes without related streams have higher priority
1098             IStream thisStream = getStream();
1099             IStream thatStream = that.getStream();
1100             if (thisStream == null)
1101                 return thatStream == null ? 0 : -1;
1102             if (thatStream == null)
1103                 return 1;
1104             // If this.stream.priority > that.stream.priority => this.stream has less priority than that.stream
1105             return thatStream.getPriority() - thisStream.getPriority();
1106         }
1107 
1108         private void cancelTask()
1109         {
1110             Scheduler.Task task = this.task;
1111             if (task != null)
1112                 task.cancel();
1113         }
1114 
1115         @Override
1116         public void run()
1117         {
1118             close();
1119             failed(new InterruptedByTimeoutException());
1120         }
1121 
1122         @Override
1123         public void succeeded()
1124         {
1125             cancelTask();
1126             StandardSession.this.complete(callback);
1127         }
1128 
1129         @Override
1130         public void failed(Throwable x)
1131         {
1132             cancelTask();
1133             notifyCallbackFailed(callback, x);
1134         }
1135     }
1136 
1137     protected class ControlFrameBytes extends AbstractFrameBytes
1138     {
1139         private final ControlFrame frame;
1140         private final ByteBuffer buffer;
1141 
1142         private ControlFrameBytes(IStream stream, Callback callback, ControlFrame frame, ByteBuffer buffer)
1143         {
1144             super(stream, callback);
1145             this.frame = frame;
1146             this.buffer = buffer;
1147         }
1148 
1149         @Override
1150         public ByteBuffer getByteBuffer()
1151         {
1152             return buffer;
1153         }
1154 
1155         @Override
1156         public void succeeded()
1157         {
1158             bufferPool.release(buffer);
1159 
1160             super.succeeded();
1161 
1162             if (frame.getType() == ControlFrameType.GO_AWAY)
1163             {
1164                 // After sending a GO_AWAY we need to hard close the connection.
1165                 // Recipients will know the last good stream id and act accordingly.
1166                 close();
1167             }
1168             IStream stream = getStream();
1169             if (stream != null && stream.isClosed())
1170                 removeStream(stream);
1171         }
1172 
1173         @Override
1174         public String toString()
1175         {
1176             return frame.toString();
1177         }
1178     }
1179 
1180     protected class DataFrameBytes extends AbstractFrameBytes
1181     {
1182         private final DataInfo dataInfo;
1183         private int size;
1184         private volatile ByteBuffer buffer;
1185 
1186         private DataFrameBytes(IStream stream, Callback handler, DataInfo dataInfo)
1187         {
1188             super(stream, handler);
1189             this.dataInfo = dataInfo;
1190         }
1191 
1192         @Override
1193         public ByteBuffer getByteBuffer()
1194         {
1195             try
1196             {
1197                 IStream stream = getStream();
1198                 int windowSize = stream.getWindowSize();
1199 
1200                 // TODO: optimization
1201                 // Right now, we use the windowSize to chunk big buffers.
1202                 // However, if the window size is large, we may congest the
1203                 // connection, or favor one stream that does a big download,
1204                 // starving the other streams.
1205                 // Also, SPDY DATA frames have a maximum of 16 MiB size, which
1206                 // is not enforced here.
1207                 // We should have a configurable "preferredDataFrameSize"
1208                 // (or even better autotuning) that will allow to send chunks
1209                 // that will not starve other streams and small enough to
1210                 // not congest the connection, while avoiding to send too many
1211                 // TCP packets.
1212                 // See also comment in class Flusher.
1213 
1214                 size = dataInfo.available();
1215                 if (size > windowSize)
1216                     size = windowSize;
1217 
1218                 buffer = generator.data(stream.getId(), size, dataInfo);
1219                 return buffer;
1220             }
1221             catch (Throwable x)
1222             {
1223                 failed(x);
1224                 return null;
1225             }
1226         }
1227 
1228         @Override
1229         public void succeeded()
1230         {
1231             bufferPool.release(buffer);
1232             IStream stream = getStream();
1233             dataInfo.consume(size);
1234             flowControlStrategy.updateWindow(StandardSession.this, stream, -size);
1235             if (dataInfo.available() > 0)
1236             {
1237                 // We have written a frame out of this DataInfo, but there is more to write.
1238                 // We need to keep the correct ordering of frames, to avoid that another
1239                 // DataInfo for the same stream is written before this one is finished.
1240                 flush(this, flusher.prepend(this));
1241             }
1242             else
1243             {
1244                 super.succeeded();
1245                 stream.updateCloseState(dataInfo.isClose(), true);
1246                 if (stream.isClosed())
1247                     removeStream(stream);
1248             }
1249         }
1250 
1251         @Override
1252         public String toString()
1253         {
1254             return String.format("DATA bytes @%x available=%d consumed=%d on %s", dataInfo.hashCode(), dataInfo.available(), dataInfo.consumed(), getStream());
1255         }
1256     }
1257 
1258     protected class CloseFrameBytes extends AbstractFrameBytes
1259     {
1260         private CloseFrameBytes()
1261         {
1262             super(null, Callback.Adapter.INSTANCE);
1263         }
1264 
1265         @Override
1266         public ByteBuffer getByteBuffer()
1267         {
1268             return BufferUtil.EMPTY_BUFFER;
1269         }
1270 
1271         @Override
1272         public void succeeded()
1273         {
1274             super.succeeded();
1275             close();
1276         }
1277     }
1278 
1279     private static class PingInfoCallback extends PingResultInfo implements Callback
1280     {
1281         private final Promise<PingResultInfo> promise;
1282 
1283         public PingInfoCallback(int pingId, Promise<PingResultInfo> promise)
1284         {
1285             super(pingId);
1286             this.promise = promise;
1287         }
1288 
1289         @Override
1290         public void succeeded()
1291         {
1292             if (promise != null)
1293                 promise.succeeded(this);
1294         }
1295 
1296         @Override
1297         public void failed(Throwable x)
1298         {
1299             if (promise != null)
1300                 promise.failed(x);
1301         }
1302     }
1303 }