View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.spdy;
20  
21  import java.io.IOException;
22  import java.net.InetSocketAddress;
23  import java.nio.ByteBuffer;
24  import java.nio.channels.InterruptedByTimeoutException;
25  import java.util.ArrayList;
26  import java.util.Collections;
27  import java.util.HashSet;
28  import java.util.LinkedList;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.Objects;
32  import java.util.Set;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ConcurrentMap;
35  import java.util.concurrent.CopyOnWriteArrayList;
36  import java.util.concurrent.ExecutionException;
37  import java.util.concurrent.Executor;
38  import java.util.concurrent.TimeUnit;
39  import java.util.concurrent.TimeoutException;
40  import java.util.concurrent.atomic.AtomicBoolean;
41  import java.util.concurrent.atomic.AtomicInteger;
42  
43  import org.eclipse.jetty.io.ByteBufferPool;
44  import org.eclipse.jetty.io.EndPoint;
45  import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
46  import org.eclipse.jetty.spdy.api.DataInfo;
47  import org.eclipse.jetty.spdy.api.GoAwayInfo;
48  import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
49  import org.eclipse.jetty.spdy.api.PingInfo;
50  import org.eclipse.jetty.spdy.api.PingResultInfo;
51  import org.eclipse.jetty.spdy.api.PushInfo;
52  import org.eclipse.jetty.spdy.api.RstInfo;
53  import org.eclipse.jetty.spdy.api.SPDYException;
54  import org.eclipse.jetty.spdy.api.Session;
55  import org.eclipse.jetty.spdy.api.SessionFrameListener;
56  import org.eclipse.jetty.spdy.api.SessionStatus;
57  import org.eclipse.jetty.spdy.api.Settings;
58  import org.eclipse.jetty.spdy.api.SettingsInfo;
59  import org.eclipse.jetty.spdy.api.Stream;
60  import org.eclipse.jetty.spdy.api.StreamFrameListener;
61  import org.eclipse.jetty.spdy.api.StreamStatus;
62  import org.eclipse.jetty.spdy.api.SynInfo;
63  import org.eclipse.jetty.spdy.frames.ControlFrame;
64  import org.eclipse.jetty.spdy.frames.ControlFrameType;
65  import org.eclipse.jetty.spdy.frames.CredentialFrame;
66  import org.eclipse.jetty.spdy.frames.DataFrame;
67  import org.eclipse.jetty.spdy.frames.GoAwayFrame;
68  import org.eclipse.jetty.spdy.frames.HeadersFrame;
69  import org.eclipse.jetty.spdy.frames.PingFrame;
70  import org.eclipse.jetty.spdy.frames.RstStreamFrame;
71  import org.eclipse.jetty.spdy.frames.SettingsFrame;
72  import org.eclipse.jetty.spdy.frames.SynReplyFrame;
73  import org.eclipse.jetty.spdy.frames.SynStreamFrame;
74  import org.eclipse.jetty.spdy.frames.WindowUpdateFrame;
75  import org.eclipse.jetty.spdy.generator.Generator;
76  import org.eclipse.jetty.spdy.parser.Parser;
77  import org.eclipse.jetty.util.Atomics;
78  import org.eclipse.jetty.util.BufferUtil;
79  import org.eclipse.jetty.util.Callback;
80  import org.eclipse.jetty.util.ForkInvoker;
81  import org.eclipse.jetty.util.FutureCallback;
82  import org.eclipse.jetty.util.FuturePromise;
83  import org.eclipse.jetty.util.Promise;
84  import org.eclipse.jetty.util.component.ContainerLifeCycle;
85  import org.eclipse.jetty.util.component.Dumpable;
86  import org.eclipse.jetty.util.log.Log;
87  import org.eclipse.jetty.util.log.Logger;
88  import org.eclipse.jetty.util.thread.Scheduler;
89  
90  public class StandardSession implements ISession, Parser.Listener, Dumpable
91  {
92      private static final Logger LOG = Log.getLogger(Session.class);
93  
94      private final ForkInvoker<Callback> invoker = new SessionInvoker();
95      private final Map<String, Object> attributes = new ConcurrentHashMap<>();
96      private final List<Listener> listeners = new CopyOnWriteArrayList<>();
97      private final ConcurrentMap<Integer, IStream> streams = new ConcurrentHashMap<>();
98      private final LinkedList<FrameBytes> queue = new LinkedList<>();
99      private final ByteBufferPool bufferPool;
100     private final Executor threadPool;
101     private final Scheduler scheduler;
102     private final short version;
103     private final Controller controller;
104     private final EndPoint endPoint;
105     private final IdleListener idleListener;
106     private final AtomicInteger streamIds;
107     private final AtomicInteger pingIds;
108     private final SessionFrameListener listener;
109     private final Generator generator;
110     private final AtomicBoolean goAwaySent = new AtomicBoolean();
111     private final AtomicBoolean goAwayReceived = new AtomicBoolean();
112     private final AtomicInteger lastStreamId = new AtomicInteger();
113     private final AtomicInteger localStreamCount = new AtomicInteger(0);
114     private final FlowControlStrategy flowControlStrategy;
115     private volatile int maxConcurrentLocalStreams = -1;
116     private boolean flushing;
117     private Throwable failure;
118 
119     public StandardSession(short version, ByteBufferPool bufferPool, Executor threadPool, Scheduler scheduler,
120                            Controller controller, EndPoint endPoint, IdleListener idleListener, int initialStreamId,
121                            SessionFrameListener listener, Generator generator, FlowControlStrategy flowControlStrategy)
122     {
123         // TODO this should probably be an aggregate lifecycle
124         this.version = version;
125         this.bufferPool = bufferPool;
126         this.threadPool = threadPool;
127         this.scheduler = scheduler;
128         this.controller = controller;
129         this.endPoint = endPoint;
130         this.idleListener = idleListener;
131         this.streamIds = new AtomicInteger(initialStreamId);
132         this.pingIds = new AtomicInteger(initialStreamId);
133         this.listener = listener;
134         this.generator = generator;
135         this.flowControlStrategy = flowControlStrategy;
136     }
137 
138     @Override
139     public short getVersion()
140     {
141         return version;
142     }
143 
144     @Override
145     public void addListener(Listener listener)
146     {
147         listeners.add(listener);
148     }
149 
150     @Override
151     public void removeListener(Listener listener)
152     {
153         listeners.remove(listener);
154     }
155 
156     @Override
157     public Stream syn(SynInfo synInfo, StreamFrameListener listener) throws ExecutionException, InterruptedException, TimeoutException
158     {
159         FuturePromise<Stream> result = new FuturePromise<>();
160         syn(synInfo, listener, result);
161         if (synInfo.getTimeout() > 0)
162             return result.get(synInfo.getTimeout(), synInfo.getUnit());
163         else
164             return result.get();
165     }
166 
167     @Override
168     public void syn(SynInfo synInfo, StreamFrameListener listener, Promise<Stream> promise)
169     {
170         // Synchronization is necessary.
171         // SPEC v3, 2.3.1 requires that the stream creation be monotonically crescent
172         // so we cannot allow thread1 to create stream1 and thread2 create stream3 and
173         // have stream3 hit the network before stream1, not only to comply with the spec
174         // but also because the compression context for the headers would be wrong, as the
175         // frame with a compression history will come before the first compressed frame.
176         int associatedStreamId = 0;
177         if (synInfo instanceof PushSynInfo)
178             associatedStreamId = ((PushSynInfo)synInfo).getAssociatedStreamId();
179 
180         synchronized (this)
181         {
182             int streamId = streamIds.getAndAdd(2);
183             // TODO: for SPDYv3 we need to support the "slot" argument
184             SynStreamFrame synStream = new SynStreamFrame(version, synInfo.getFlags(), streamId, associatedStreamId, synInfo.getPriority(), (short)0, synInfo.getHeaders());
185             IStream stream = createStream(synStream, listener, true, promise);
186             if (stream == null)
187                 return;
188             generateAndEnqueueControlFrame(stream, synStream, synInfo.getTimeout(), synInfo.getUnit(), stream);
189         }
190         flush();
191     }
192 
193     @Override
194     public void rst(RstInfo rstInfo) throws InterruptedException, ExecutionException, TimeoutException
195     {
196         FutureCallback result = new FutureCallback();
197         rst(rstInfo, result);
198         if (rstInfo.getTimeout() > 0)
199             result.get(rstInfo.getTimeout(), rstInfo.getUnit());
200         else
201             result.get();
202     }
203 
204     @Override
205     public void rst(RstInfo rstInfo, Callback callback)
206     {
207         // SPEC v3, 2.2.2
208         if (goAwaySent.get())
209         {
210             complete(callback);
211         }
212         else
213         {
214             int streamId = rstInfo.getStreamId();
215             IStream stream = streams.get(streamId);
216             RstStreamFrame frame = new RstStreamFrame(version, streamId, rstInfo.getStreamStatus().getCode(version));
217             control(stream, frame, rstInfo.getTimeout(), rstInfo.getUnit(), callback);
218             if (stream != null)
219             {
220                 stream.process(frame);
221                 removeFrameBytesFromQueue(stream);
222                 removeStream(stream);
223             }
224         }
225     }
226 
227     private void removeFrameBytesFromQueue(Stream stream)
228     {
229         synchronized (queue)
230         {
231             for (FrameBytes frameBytes : queue)
232                 if (frameBytes.getStream() == stream)
233                     queue.remove(frameBytes);
234         }
235     }
236 
237     @Override
238     public void settings(SettingsInfo settingsInfo) throws ExecutionException, InterruptedException, TimeoutException
239     {
240         FutureCallback result = new FutureCallback();
241         settings(settingsInfo, result);
242         if (settingsInfo.getTimeout() > 0)
243             result.get(settingsInfo.getTimeout(), settingsInfo.getUnit());
244         else
245             result.get();
246     }
247 
248     @Override
249     public void settings(SettingsInfo settingsInfo, Callback callback)
250     {
251         SettingsFrame frame = new SettingsFrame(version, settingsInfo.getFlags(), settingsInfo.getSettings());
252         control(null, frame, settingsInfo.getTimeout(), settingsInfo.getUnit(), callback);
253     }
254 
255     @Override
256     public PingResultInfo ping(PingInfo pingInfo) throws ExecutionException, InterruptedException, TimeoutException
257     {
258         FuturePromise<PingResultInfo> result = new FuturePromise<>();
259         ping(pingInfo, result);
260         if (pingInfo.getTimeout() > 0)
261             return result.get(pingInfo.getTimeout(), pingInfo.getUnit());
262         else
263             return result.get();
264     }
265 
266     @Override
267     public void ping(PingInfo pingInfo, Promise<PingResultInfo> promise)
268     {
269         int pingId = pingIds.getAndAdd(2);
270         PingInfoCallback pingInfoCallback = new PingInfoCallback(pingId, promise);
271         PingFrame frame = new PingFrame(version, pingId);
272         control(null, frame, pingInfo.getTimeout(), pingInfo.getUnit(), pingInfoCallback);
273     }
274 
275     @Override
276     public void goAway(GoAwayInfo goAwayInfo) throws ExecutionException, InterruptedException, TimeoutException
277     {
278         goAway(goAwayInfo, SessionStatus.OK);
279     }
280 
281     private void goAway(GoAwayInfo goAwayInfo, SessionStatus sessionStatus) throws ExecutionException, InterruptedException, TimeoutException
282     {
283         FutureCallback result = new FutureCallback();
284         goAway(sessionStatus, goAwayInfo.getTimeout(), goAwayInfo.getUnit(), result);
285         if (goAwayInfo.getTimeout() > 0)
286             result.get(goAwayInfo.getTimeout(), goAwayInfo.getUnit());
287         else
288             result.get();
289     }
290 
291     @Override
292     public void goAway(GoAwayInfo goAwayInfo, Callback callback)
293     {
294         goAway(SessionStatus.OK, goAwayInfo.getTimeout(), goAwayInfo.getUnit(), callback);
295     }
296 
297     private void goAway(SessionStatus sessionStatus, long timeout, TimeUnit unit, Callback callback)
298     {
299         if (goAwaySent.compareAndSet(false, true))
300         {
301             if (!goAwayReceived.get())
302             {
303                 GoAwayFrame frame = new GoAwayFrame(version, lastStreamId.get(), sessionStatus.getCode());
304                 control(null, frame, timeout, unit, callback);
305                 return;
306             }
307         }
308         complete(callback);
309     }
310 
311     @Override
312     public Set<Stream> getStreams()
313     {
314         Set<Stream> result = new HashSet<>();
315         result.addAll(streams.values());
316         return result;
317     }
318 
319     @Override
320     public IStream getStream(int streamId)
321     {
322         return streams.get(streamId);
323     }
324 
325     @Override
326     public Object getAttribute(String key)
327     {
328         return attributes.get(key);
329     }
330 
331     @Override
332     public void setAttribute(String key, Object value)
333     {
334         attributes.put(key, value);
335     }
336 
337     @Override
338     public Object removeAttribute(String key)
339     {
340         return attributes.remove(key);
341     }
342 
343     @Override
344     public InetSocketAddress getLocalAddress()
345     {
346         return endPoint.getLocalAddress();
347     }
348 
349     @Override
350     public InetSocketAddress getRemoteAddress()
351     {
352         return endPoint.getRemoteAddress();
353     }
354 
355     @Override
356     public void onControlFrame(ControlFrame frame)
357     {
358         notifyIdle(idleListener, false);
359         try
360         {
361             LOG.debug("Processing {}", frame);
362 
363             if (goAwaySent.get())
364             {
365                 LOG.debug("Skipped processing of {}", frame);
366                 return;
367             }
368 
369             switch (frame.getType())
370             {
371                 case SYN_STREAM:
372                 {
373                     onSyn((SynStreamFrame)frame);
374                     break;
375                 }
376                 case SYN_REPLY:
377                 {
378                     onReply((SynReplyFrame)frame);
379                     break;
380                 }
381                 case RST_STREAM:
382                 {
383                     onRst((RstStreamFrame)frame);
384                     break;
385                 }
386                 case SETTINGS:
387                 {
388                     onSettings((SettingsFrame)frame);
389                     break;
390                 }
391                 case NOOP:
392                 {
393                     // Just ignore it
394                     break;
395                 }
396                 case PING:
397                 {
398                     onPing((PingFrame)frame);
399                     break;
400                 }
401                 case GO_AWAY:
402                 {
403                     onGoAway((GoAwayFrame)frame);
404                     break;
405                 }
406                 case HEADERS:
407                 {
408                     onHeaders((HeadersFrame)frame);
409                     break;
410                 }
411                 case WINDOW_UPDATE:
412                 {
413                     onWindowUpdate((WindowUpdateFrame)frame);
414                     break;
415                 }
416                 case CREDENTIAL:
417                 {
418                     onCredential((CredentialFrame)frame);
419                     break;
420                 }
421                 default:
422                 {
423                     throw new IllegalStateException();
424                 }
425             }
426         }
427         finally
428         {
429             notifyIdle(idleListener, true);
430         }
431     }
432 
433     @Override
434     public void onDataFrame(DataFrame frame, ByteBuffer data)
435     {
436         notifyIdle(idleListener, false);
437         try
438         {
439             LOG.debug("Processing {}, {} data bytes", frame, data.remaining());
440 
441             if (goAwaySent.get())
442             {
443                 LOG.debug("Skipped processing of {}", frame);
444                 return;
445             }
446 
447             int streamId = frame.getStreamId();
448             IStream stream = streams.get(streamId);
449             if (stream == null)
450             {
451                 RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM);
452                 LOG.debug("Unknown stream {}", rstInfo);
453                 rst(rstInfo, new Callback.Adapter());
454             }
455             else
456             {
457                 processData(stream, frame, data);
458             }
459         }
460         finally
461         {
462             notifyIdle(idleListener, true);
463         }
464     }
465 
466     private void notifyIdle(IdleListener listener, boolean idle)
467     {
468         if (listener != null)
469             listener.onIdle(idle);
470     }
471 
472     private void processData(final IStream stream, DataFrame frame, ByteBuffer data)
473     {
474         ByteBufferDataInfo dataInfo = new ByteBufferDataInfo(data, frame.isClose())
475         {
476             @Override
477             public void consume(int delta)
478             {
479                 super.consume(delta);
480                 flowControlStrategy.onDataConsumed(StandardSession.this, stream, this, delta);
481             }
482         };
483         flowControlStrategy.onDataReceived(this, stream, dataInfo);
484         stream.process(dataInfo);
485         if (stream.isClosed())
486             removeStream(stream);
487     }
488 
489     @Override
490     public void onStreamException(StreamException x)
491     {
492         notifyOnException(listener, x);
493         rst(new RstInfo(x.getStreamId(), x.getStreamStatus()), new Callback.Adapter());
494     }
495 
496     @Override
497     public void onSessionException(SessionException x)
498     {
499         Throwable cause = x.getCause();
500         notifyOnException(listener, cause == null ? x : cause);
501         goAway(x.getSessionStatus(), 0, TimeUnit.SECONDS, new Callback.Adapter());
502     }
503 
504     private void onSyn(final SynStreamFrame frame)
505     {
506         IStream stream = createStream(frame, null, false, new Promise.Adapter<Stream>()
507         {
508             @Override
509             public void failed(Throwable x)
510             {
511                 LOG.debug("Received: {} but creating new Stream failed: {}", frame, x.getMessage());
512             }
513         });
514         if (stream != null)
515             processSyn(listener, stream, frame);
516     }
517 
518     private void processSyn(SessionFrameListener listener, IStream stream, SynStreamFrame frame)
519     {
520         stream.process(frame);
521         // Update the last stream id before calling the application (which may send a GO_AWAY)
522         updateLastStreamId(stream);
523         StreamFrameListener streamListener;
524         if (stream.isUnidirectional())
525         {
526             PushInfo pushInfo = new PushInfo(frame.getHeaders(), frame.isClose());
527             streamListener = notifyOnPush(stream.getAssociatedStream().getStreamFrameListener(), stream, pushInfo);
528         }
529         else
530         {
531             SynInfo synInfo = new SynInfo(frame.getHeaders(), frame.isClose(), frame.getPriority());
532             streamListener = notifyOnSyn(listener, stream, synInfo);
533         }
534         stream.setStreamFrameListener(streamListener);
535         flush();
536         // The onSyn() listener may have sent a frame that closed the stream
537         if (stream.isClosed())
538             removeStream(stream);
539     }
540 
541     private IStream createStream(SynStreamFrame frame, StreamFrameListener listener, boolean local, Promise<Stream> promise)
542     {
543         IStream associatedStream = streams.get(frame.getAssociatedStreamId());
544         IStream stream = new StandardStream(frame.getStreamId(), frame.getPriority(), this, associatedStream, promise);
545         flowControlStrategy.onNewStream(this, stream);
546 
547         stream.updateCloseState(frame.isClose(), local);
548         stream.setStreamFrameListener(listener);
549 
550         if (stream.isUnidirectional())
551         {
552             // Unidirectional streams are implicitly half closed
553             stream.updateCloseState(true, !local);
554             if (!stream.isClosed())
555                 stream.getAssociatedStream().associate(stream);
556         }
557 
558         int streamId = stream.getId();
559 
560         if (local)
561         {
562             while (true)
563             {
564                 int oldStreamCountValue = localStreamCount.get();
565                 int maxConcurrentStreams = maxConcurrentLocalStreams;
566                 if (maxConcurrentStreams > -1 && oldStreamCountValue >= maxConcurrentStreams)
567                 {
568                     String message = String.format("Max concurrent local streams (%d) exceeded.",
569                             maxConcurrentStreams);
570                     LOG.debug(message);
571                     promise.failed(new SPDYException(message));
572                     return null;
573                 }
574                 if (localStreamCount.compareAndSet(oldStreamCountValue, oldStreamCountValue + 1))
575                     break;
576             }
577         }
578 
579         if (streams.putIfAbsent(streamId, stream) != null)
580         {
581             String message = "Duplicate stream id " + streamId;
582             IllegalStateException duplicateIdException = new IllegalStateException(message);
583             promise.failed(duplicateIdException);
584             if (local)
585             {
586                 localStreamCount.decrementAndGet();
587                 throw duplicateIdException;
588             }
589             RstInfo rstInfo = new RstInfo(streamId, StreamStatus.PROTOCOL_ERROR);
590             LOG.debug("Duplicate stream, {}", rstInfo);
591             rst(rstInfo, new Callback.Adapter()); // We don't care (too much) if the reset fails.
592             return null;
593         }
594         else
595         {
596             LOG.debug("Created {}", stream);
597             notifyStreamCreated(stream);
598             return stream;
599         }
600     }
601 
602     private void notifyStreamCreated(IStream stream)
603     {
604         for (Listener listener : listeners)
605         {
606             if (listener instanceof StreamListener)
607             {
608                 try
609                 {
610                     ((StreamListener)listener).onStreamCreated(stream);
611                 }
612                 catch (Exception x)
613                 {
614                     LOG.info("Exception while notifying listener " + listener, x);
615                 }
616                 catch (Error x)
617                 {
618                     LOG.info("Exception while notifying listener " + listener, x);
619                     throw x;
620                 }
621             }
622         }
623     }
624 
625     private void removeStream(IStream stream)
626     {
627         if (stream.isUnidirectional())
628             stream.getAssociatedStream().disassociate(stream);
629 
630         IStream removed = streams.remove(stream.getId());
631         if (removed != null)
632         {
633             assert removed == stream;
634 
635             if (streamIds.get() % 2 == stream.getId() % 2)
636                 localStreamCount.decrementAndGet();
637 
638             LOG.debug("Removed {}", stream);
639             notifyStreamClosed(stream);
640         }
641     }
642 
643     private void notifyStreamClosed(IStream stream)
644     {
645         for (Listener listener : listeners)
646         {
647             if (listener instanceof StreamListener)
648             {
649                 try
650                 {
651                     ((StreamListener)listener).onStreamClosed(stream);
652                 }
653                 catch (Exception x)
654                 {
655                     LOG.info("Exception while notifying listener " + listener, x);
656                 }
657                 catch (Error x)
658                 {
659                     LOG.info("Exception while notifying listener " + listener, x);
660                     throw x;
661                 }
662             }
663         }
664     }
665 
666     private void onReply(SynReplyFrame frame)
667     {
668         int streamId = frame.getStreamId();
669         IStream stream = streams.get(streamId);
670         if (stream == null)
671         {
672             RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM);
673             LOG.debug("Unknown stream {}", rstInfo);
674             rst(rstInfo, new Callback.Adapter());
675         }
676         else
677         {
678             processReply(stream, frame);
679         }
680     }
681 
682     private void processReply(IStream stream, SynReplyFrame frame)
683     {
684         stream.process(frame);
685         if (stream.isClosed())
686             removeStream(stream);
687     }
688 
689     private void onRst(RstStreamFrame frame)
690     {
691         IStream stream = streams.get(frame.getStreamId());
692 
693         if (stream != null)
694             stream.process(frame);
695 
696         RstInfo rstInfo = new RstInfo(frame.getStreamId(), StreamStatus.from(frame.getVersion(), frame.getStatusCode()));
697         notifyOnRst(listener, rstInfo);
698         flush();
699 
700         if (stream != null)
701             removeStream(stream);
702     }
703 
704     private void onSettings(SettingsFrame frame)
705     {
706         Settings.Setting windowSizeSetting = frame.getSettings().get(Settings.ID.INITIAL_WINDOW_SIZE);
707         if (windowSizeSetting != null)
708         {
709             int windowSize = windowSizeSetting.value();
710             setWindowSize(windowSize);
711             LOG.debug("Updated session window size to {}", windowSize);
712         }
713         Settings.Setting maxConcurrentStreamsSetting = frame.getSettings().get(Settings.ID.MAX_CONCURRENT_STREAMS);
714         if (maxConcurrentStreamsSetting != null)
715         {
716             int maxConcurrentStreamsValue = maxConcurrentStreamsSetting.value();
717             maxConcurrentLocalStreams = maxConcurrentStreamsValue;
718             LOG.debug("Updated session maxConcurrentLocalStreams to {}", maxConcurrentStreamsValue);
719         }
720         SettingsInfo settingsInfo = new SettingsInfo(frame.getSettings(), frame.isClearPersisted());
721         notifyOnSettings(listener, settingsInfo);
722         flush();
723     }
724 
725     private void onPing(PingFrame frame)
726     {
727         int pingId = frame.getPingId();
728         if (pingId % 2 == pingIds.get() % 2)
729         {
730             PingResultInfo pingResultInfo = new PingResultInfo(frame.getPingId());
731             notifyOnPing(listener, pingResultInfo);
732             flush();
733         }
734         else
735         {
736             control(null, frame, 0, TimeUnit.MILLISECONDS, new Callback.Adapter());
737         }
738     }
739 
740     private void onGoAway(GoAwayFrame frame)
741     {
742         if (goAwayReceived.compareAndSet(false, true))
743         {
744             //TODO: Find a better name for GoAwayResultInfo
745             GoAwayResultInfo goAwayResultInfo = new GoAwayResultInfo(frame.getLastStreamId(), SessionStatus.from(frame.getStatusCode()));
746             notifyOnGoAway(listener, goAwayResultInfo);
747             flush();
748             // SPDY does not require to send back a response to a GO_AWAY.
749             // We notified the application of the last good stream id and
750             // tried our best to flush remaining data.
751         }
752     }
753 
754     private void onHeaders(HeadersFrame frame)
755     {
756         int streamId = frame.getStreamId();
757         IStream stream = streams.get(streamId);
758         if (stream == null)
759         {
760             RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM);
761             LOG.debug("Unknown stream, {}", rstInfo);
762             rst(rstInfo, new Callback.Adapter());
763         }
764         else
765         {
766             processHeaders(stream, frame);
767         }
768     }
769 
770     private void processHeaders(IStream stream, HeadersFrame frame)
771     {
772         stream.process(frame);
773         if (stream.isClosed())
774             removeStream(stream);
775     }
776 
777     private void onWindowUpdate(WindowUpdateFrame frame)
778     {
779         int streamId = frame.getStreamId();
780         IStream stream = streams.get(streamId);
781         flowControlStrategy.onWindowUpdate(this, stream, frame.getWindowDelta());
782         flush();
783     }
784 
785     private void onCredential(CredentialFrame frame)
786     {
787         LOG.warn("{} frame not yet supported", frame.getType());
788         flush();
789     }
790 
791     protected void close()
792     {
793         // Check for null to support tests
794         if (controller != null)
795             controller.close(false);
796     }
797 
798     private void notifyOnException(SessionFrameListener listener, Throwable x)
799     {
800         try
801         {
802             if (listener != null)
803             {
804                 LOG.debug("Invoking callback with {} on listener {}", x, listener);
805                 listener.onException(x);
806             }
807         }
808         catch (Exception xx)
809         {
810             LOG.info("Exception while notifying listener " + listener, xx);
811         }
812         catch (Error xx)
813         {
814             LOG.info("Exception while notifying listener " + listener, xx);
815             throw xx;
816         }
817     }
818 
819     private StreamFrameListener notifyOnPush(StreamFrameListener listener, Stream stream, PushInfo pushInfo)
820     {
821         try
822         {
823             if (listener == null)
824                 return null;
825             LOG.debug("Invoking callback with {} on listener {}", pushInfo, listener);
826             return listener.onPush(stream, pushInfo);
827         }
828         catch (Exception x)
829         {
830             LOG.info("Exception while notifying listener " + listener, x);
831             return null;
832         }
833         catch (Error x)
834         {
835             LOG.info("Exception while notifying listener " + listener, x);
836             throw x;
837         }
838     }
839 
840     private StreamFrameListener notifyOnSyn(SessionFrameListener listener, Stream stream, SynInfo synInfo)
841     {
842         try
843         {
844             if (listener == null)
845                 return null;
846             LOG.debug("Invoking callback with {} on listener {}", synInfo, listener);
847             return listener.onSyn(stream, synInfo);
848         }
849         catch (Exception x)
850         {
851             LOG.info("Exception while notifying listener " + listener, x);
852             return null;
853         }
854         catch (Error x)
855         {
856             LOG.info("Exception while notifying listener " + listener, x);
857             throw x;
858         }
859     }
860 
861     private void notifyOnRst(SessionFrameListener listener, RstInfo rstInfo)
862     {
863         try
864         {
865             if (listener != null)
866             {
867                 LOG.debug("Invoking callback with {} on listener {}", rstInfo, listener);
868                 listener.onRst(this, rstInfo);
869             }
870         }
871         catch (Exception x)
872         {
873             LOG.info("Exception while notifying listener " + listener, x);
874         }
875         catch (Error x)
876         {
877             LOG.info("Exception while notifying listener " + listener, x);
878             throw x;
879         }
880     }
881 
882     private void notifyOnSettings(SessionFrameListener listener, SettingsInfo settingsInfo)
883     {
884         try
885         {
886             if (listener != null)
887             {
888                 LOG.debug("Invoking callback with {} on listener {}", settingsInfo, listener);
889                 listener.onSettings(this, settingsInfo);
890             }
891         }
892         catch (Exception x)
893         {
894             LOG.info("Exception while notifying listener " + listener, x);
895         }
896         catch (Error x)
897         {
898             LOG.info("Exception while notifying listener " + listener, x);
899             throw x;
900         }
901     }
902 
903     private void notifyOnPing(SessionFrameListener listener, PingResultInfo pingResultInfo)
904     {
905         try
906         {
907             if (listener != null)
908             {
909                 LOG.debug("Invoking callback with {} on listener {}", pingResultInfo, listener);
910                 listener.onPing(this, pingResultInfo);
911             }
912         }
913         catch (Exception x)
914         {
915             LOG.info("Exception while notifying listener " + listener, x);
916         }
917         catch (Error x)
918         {
919             LOG.info("Exception while notifying listener " + listener, x);
920             throw x;
921         }
922     }
923 
924     private void notifyOnGoAway(SessionFrameListener listener, GoAwayResultInfo goAwayResultInfo)
925     {
926         try
927         {
928             if (listener != null)
929             {
930                 LOG.debug("Invoking callback with {} on listener {}", goAwayResultInfo, listener);
931                 listener.onGoAway(this, goAwayResultInfo);
932             }
933         }
934         catch (Exception x)
935         {
936             LOG.info("Exception while notifying listener " + listener, x);
937         }
938         catch (Error x)
939         {
940             LOG.info("Exception while notifying listener " + listener, x);
941             throw x;
942         }
943     }
944 
945 
946     @Override
947     public void control(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Callback callback)
948     {
949         generateAndEnqueueControlFrame(stream, frame, timeout, unit, callback);
950         flush();
951     }
952 
953     private void generateAndEnqueueControlFrame(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Callback callback)
954     {
955         try
956         {
957             // Synchronization is necessary, since we may have concurrent replies
958             // and those needs to be generated and enqueued atomically in order
959             // to maintain a correct compression context
960             synchronized (this)
961             {
962                 ByteBuffer buffer = generator.control(frame);
963                 LOG.debug("Queuing {} on {}", frame, stream);
964                 ControlFrameBytes frameBytes = new ControlFrameBytes(stream, callback, frame, buffer);
965                 if (timeout > 0)
966                     frameBytes.task = scheduler.schedule(frameBytes, timeout, unit);
967 
968                 // Special handling for PING frames, they must be sent as soon as possible
969                 if (ControlFrameType.PING == frame.getType())
970                     prepend(frameBytes);
971                 else
972                     append(frameBytes);
973             }
974         }
975         catch (Exception x)
976         {
977             notifyCallbackFailed(callback, x);
978         }
979     }
980 
981     private void updateLastStreamId(IStream stream)
982     {
983         int streamId = stream.getId();
984         if (streamId % 2 != streamIds.get() % 2)
985             Atomics.updateMax(lastStreamId, streamId);
986     }
987 
988     @Override
989     public void data(IStream stream, DataInfo dataInfo, long timeout, TimeUnit unit, Callback callback)
990     {
991         LOG.debug("Queuing {} on {}", dataInfo, stream);
992         DataFrameBytes frameBytes = new DataFrameBytes(stream, callback, dataInfo);
993         if (timeout > 0)
994             frameBytes.task = scheduler.schedule(frameBytes, timeout, unit);
995         append(frameBytes);
996         flush();
997     }
998 
999     @Override
1000     public void shutdown()
1001     {
1002         FrameBytes frameBytes = new CloseFrameBytes();
1003         append(frameBytes);
1004         flush();
1005     }
1006 
1007     private void execute(Runnable task)
1008     {
1009         threadPool.execute(task);
1010     }
1011 
1012     @Override
1013     public void flush()
1014     {
1015         FrameBytes frameBytes = null;
1016         ByteBuffer buffer = null;
1017         boolean failFrameBytes = false;
1018         synchronized (queue)
1019         {
1020             if (flushing || queue.isEmpty())
1021                 return;
1022 
1023             Set<IStream> stalledStreams = null;
1024             for (int i = 0; i < queue.size(); ++i)
1025             {
1026                 frameBytes = queue.get(i);
1027 
1028                 IStream stream = frameBytes.getStream();
1029                 if (stream != null && stalledStreams != null && stalledStreams.contains(stream))
1030                     continue;
1031 
1032                 buffer = frameBytes.getByteBuffer();
1033                 if (buffer != null)
1034                 {
1035                     queue.remove(i);
1036                     if (stream != null && stream.isReset() && !(frameBytes instanceof ControlFrameBytes))
1037                         failFrameBytes = true;
1038                     break;
1039                 }
1040 
1041                 if (stalledStreams == null)
1042                     stalledStreams = new HashSet<>();
1043                 if (stream != null)
1044                     stalledStreams.add(stream);
1045 
1046                 LOG.debug("Flush stalled for {}, {} frame(s) in queue", frameBytes, queue.size());
1047             }
1048 
1049             if (buffer == null)
1050                 return;
1051 
1052             if (!failFrameBytes)
1053             {
1054                 flushing = true;
1055                 LOG.debug("Flushing {}, {} frame(s) in queue", frameBytes, queue.size());
1056             }
1057         }
1058         if (failFrameBytes)
1059         {
1060             frameBytes.fail(new StreamException(frameBytes.getStream().getId(), StreamStatus.INVALID_STREAM,
1061                     "Stream: " + frameBytes.getStream() + " is reset!"));
1062         }
1063         else
1064         {
1065             write(buffer, frameBytes);
1066         }
1067     }
1068 
1069     void append(FrameBytes frameBytes)
1070     {
1071         Throwable failure;
1072         synchronized (queue)
1073         {
1074             failure = this.failure;
1075             if (failure == null)
1076             {
1077                 // Frames containing headers must be send in the order the headers have been generated. We don't need
1078                 // to do this check in StandardSession.prepend() as no frames containing headers will be prepended.
1079                 if (frameBytes instanceof ControlFrameBytes)
1080                     queue.addLast(frameBytes);
1081                 else
1082                 {
1083                     int index = queue.size();
1084                     while (index > 0)
1085                     {
1086                         FrameBytes element = queue.get(index - 1);
1087                         if (element.compareTo(frameBytes) >= 0)
1088                             break;
1089                         --index;
1090                     }
1091                     queue.add(index, frameBytes);
1092                 }
1093             }
1094         }
1095 
1096         if (failure != null)
1097             frameBytes.fail(new SPDYException(failure));
1098     }
1099 
1100     private void prepend(FrameBytes frameBytes)
1101     {
1102         Throwable failure;
1103         synchronized (queue)
1104         {
1105             failure = this.failure;
1106             if (failure == null)
1107             {
1108                 int index = 0;
1109                 while (index < queue.size())
1110                 {
1111                     FrameBytes element = queue.get(index);
1112                     if (element.compareTo(frameBytes) <= 0)
1113                         break;
1114                     ++index;
1115                 }
1116                 queue.add(index, frameBytes);
1117             }
1118         }
1119 
1120         if (failure != null)
1121             frameBytes.fail(new SPDYException(failure));
1122     }
1123 
1124     protected void write(ByteBuffer buffer, Callback callback)
1125     {
1126         if (controller != null)
1127         {
1128             LOG.debug("Writing {} frame bytes of {}", buffer.remaining(), buffer.limit());
1129             controller.write(buffer, callback);
1130         }
1131     }
1132 
1133     private void complete(final Callback callback)
1134     {
1135         // Applications may send and queue up a lot of frames and
1136         // if we call Callback.completed() only synchronously we risk
1137         // starvation (for the last frames sent) and stack overflow.
1138         // Therefore every some invocation, we dispatch to a new thread
1139         invoker.invoke(callback);
1140     }
1141 
1142     private void notifyCallbackFailed(Callback callback, Throwable x)
1143     {
1144         try
1145         {
1146             if (callback != null)
1147                 callback.failed(x);
1148         }
1149         catch (Exception xx)
1150         {
1151             LOG.info("Exception while notifying callback " + callback, xx);
1152         }
1153         catch (Error xx)
1154         {
1155             LOG.info("Exception while notifying callback " + callback, xx);
1156             throw xx;
1157         }
1158     }
1159 
1160     public int getWindowSize()
1161     {
1162         return flowControlStrategy.getWindowSize(this);
1163     }
1164 
1165     public void setWindowSize(int initialWindowSize)
1166     {
1167         flowControlStrategy.setWindowSize(this, initialWindowSize);
1168     }
1169 
1170     @Override
1171     public String toString()
1172     {
1173         return String.format("%s@%x{v%d,queueSize=%d,windowSize=%d,streams=%d}", getClass().getSimpleName(),
1174                 hashCode(), version, queue.size(), getWindowSize(), streams.size());
1175     }
1176 
1177     @Override
1178     public String dump()
1179     {
1180         return ContainerLifeCycle.dump(this);
1181     }
1182 
1183     @Override
1184     public void dump(Appendable out, String indent) throws IOException
1185     {
1186         ContainerLifeCycle.dumpObject(out, this);
1187         ContainerLifeCycle.dump(out, indent, Collections.singletonList(controller), streams.values());
1188     }
1189 
1190     private class SessionInvoker extends ForkInvoker<Callback>
1191     {
1192         private SessionInvoker()
1193         {
1194             super(4);
1195         }
1196 
1197         @Override
1198         public void fork(final Callback callback)
1199         {
1200             execute(new Runnable()
1201             {
1202                 @Override
1203                 public void run()
1204                 {
1205                     callback.succeeded();
1206                     flush();
1207                 }
1208             });
1209         }
1210 
1211         @Override
1212         public void call(Callback callback)
1213         {
1214             callback.succeeded();
1215             flush();
1216         }
1217     }
1218 
1219     public interface FrameBytes extends Comparable<FrameBytes>, Callback
1220     {
1221         public IStream getStream();
1222 
1223         public abstract ByteBuffer getByteBuffer();
1224 
1225         public abstract void complete();
1226 
1227         public abstract void fail(Throwable throwable);
1228     }
1229 
1230     abstract class AbstractFrameBytes implements FrameBytes, Runnable
1231     {
1232         private final IStream stream;
1233         private final Callback callback;
1234         protected volatile Scheduler.Task task;
1235 
1236         protected AbstractFrameBytes(IStream stream, Callback callback)
1237         {
1238             this.stream = stream;
1239             this.callback = Objects.requireNonNull(callback);
1240         }
1241 
1242         @Override
1243         public IStream getStream()
1244         {
1245             return stream;
1246         }
1247 
1248         @Override
1249         public int compareTo(FrameBytes that)
1250         {
1251             // FrameBytes may have or not have a related stream (for example, PING do not have a related stream)
1252             // FrameBytes without related streams have higher priority
1253             IStream thisStream = getStream();
1254             IStream thatStream = that.getStream();
1255             if (thisStream == null)
1256                 return thatStream == null ? 0 : -1;
1257             if (thatStream == null)
1258                 return 1;
1259             // If this.stream.priority > that.stream.priority => this.stream has less priority than that.stream
1260             return thatStream.getPriority() - thisStream.getPriority();
1261         }
1262 
1263         @Override
1264         public void complete()
1265         {
1266             cancelTask();
1267             StandardSession.this.complete(callback);
1268         }
1269 
1270         @Override
1271         public void fail(Throwable x)
1272         {
1273             cancelTask();
1274             notifyCallbackFailed(callback, x);
1275             StandardSession.this.flush();
1276         }
1277 
1278         private void cancelTask()
1279         {
1280             Scheduler.Task task = this.task;
1281             if (task != null)
1282                 task.cancel();
1283         }
1284 
1285         @Override
1286         public void run()
1287         {
1288             close();
1289             fail(new InterruptedByTimeoutException());
1290         }
1291 
1292         @Override
1293         public void succeeded()
1294         {
1295             synchronized (queue)
1296             {
1297                 if (LOG.isDebugEnabled())
1298                     LOG.debug("Completed write of {}, {} frame(s) in queue", this, queue.size());
1299                 flushing = false;
1300             }
1301             complete();
1302         }
1303 
1304         @Override
1305         public void failed(Throwable x)
1306         {
1307             List<FrameBytes> frameBytesToFail = new ArrayList<>();
1308             frameBytesToFail.add(this);
1309 
1310             synchronized (queue)
1311             {
1312                 failure = x;
1313                 if (LOG.isDebugEnabled())
1314                 {
1315                     String logMessage = String.format("Failed write of %s, failing all %d frame(s) in queue", this, queue.size());
1316                     LOG.debug(logMessage, x);
1317                 }
1318                 frameBytesToFail.addAll(queue);
1319                 queue.clear();
1320                 flushing = false;
1321             }
1322 
1323             for (FrameBytes fb : frameBytesToFail)
1324                 fb.fail(x);
1325         }
1326     }
1327 
1328     private class ControlFrameBytes extends AbstractFrameBytes
1329     {
1330         private final ControlFrame frame;
1331         private final ByteBuffer buffer;
1332 
1333         private ControlFrameBytes(IStream stream, Callback callback, ControlFrame frame, ByteBuffer buffer)
1334         {
1335             super(stream, callback);
1336             this.frame = frame;
1337             this.buffer = buffer;
1338         }
1339 
1340         @Override
1341         public ByteBuffer getByteBuffer()
1342         {
1343             return buffer;
1344         }
1345 
1346         @Override
1347         public void complete()
1348         {
1349             bufferPool.release(buffer);
1350 
1351             super.complete();
1352 
1353             if (frame.getType() == ControlFrameType.GO_AWAY)
1354             {
1355                 // After sending a GO_AWAY we need to hard close the connection.
1356                 // Recipients will know the last good stream id and act accordingly.
1357                 close();
1358             }
1359             IStream stream = getStream();
1360             if (stream != null && stream.isClosed())
1361                 removeStream(stream);
1362         }
1363 
1364         @Override
1365         public String toString()
1366         {
1367             return frame.toString();
1368         }
1369     }
1370 
1371     private class DataFrameBytes extends AbstractFrameBytes
1372     {
1373         private final DataInfo dataInfo;
1374         private int size;
1375         private volatile ByteBuffer buffer;
1376 
1377         private DataFrameBytes(IStream stream, Callback handler, DataInfo dataInfo)
1378         {
1379             super(stream, handler);
1380             this.dataInfo = dataInfo;
1381         }
1382 
1383         @Override
1384         public ByteBuffer getByteBuffer()
1385         {
1386             try
1387             {
1388                 IStream stream = getStream();
1389                 int windowSize = stream.getWindowSize();
1390                 if (windowSize <= 0)
1391                     return null;
1392 
1393                 size = dataInfo.available();
1394                 if (size > windowSize)
1395                     size = windowSize;
1396 
1397                 buffer = generator.data(stream.getId(), size, dataInfo);
1398                 return buffer;
1399             }
1400             catch (Throwable x)
1401             {
1402                 fail(x);
1403                 return null;
1404             }
1405         }
1406 
1407         @Override
1408         public void complete()
1409         {
1410             bufferPool.release(buffer);
1411             IStream stream = getStream();
1412             dataInfo.consume(size);
1413             flowControlStrategy.updateWindow(StandardSession.this, stream, -size);
1414             if (dataInfo.available() > 0)
1415             {
1416                 // We have written a frame out of this DataInfo, but there is more to write.
1417                 // We need to keep the correct ordering of frames, to avoid that another
1418                 // DataInfo for the same stream is written before this one is finished.
1419                 prepend(this);
1420                 flush();
1421             }
1422             else
1423             {
1424                 super.complete();
1425                 stream.updateCloseState(dataInfo.isClose(), true);
1426                 if (stream.isClosed())
1427                     removeStream(stream);
1428             }
1429         }
1430 
1431         @Override
1432         public String toString()
1433         {
1434             return String.format("DATA bytes @%x available=%d consumed=%d on %s", dataInfo.hashCode(), dataInfo.available(), dataInfo.consumed(), getStream());
1435         }
1436     }
1437 
1438     private class CloseFrameBytes extends AbstractFrameBytes
1439     {
1440         private CloseFrameBytes()
1441         {
1442             super(null, new Callback.Adapter());
1443         }
1444 
1445         @Override
1446         public ByteBuffer getByteBuffer()
1447         {
1448             return BufferUtil.EMPTY_BUFFER;
1449         }
1450 
1451         @Override
1452         public void complete()
1453         {
1454             super.complete();
1455             close();
1456         }
1457     }
1458 
1459     private static class PingInfoCallback extends PingResultInfo implements Callback
1460     {
1461         private final Promise<PingResultInfo> promise;
1462 
1463         public PingInfoCallback(int pingId, Promise<PingResultInfo> promise)
1464         {
1465             super(pingId);
1466             this.promise = promise;
1467         }
1468 
1469         @Override
1470         public void succeeded()
1471         {
1472             if (promise != null)
1473                 promise.succeeded(this);
1474         }
1475 
1476         @Override
1477         public void failed(Throwable x)
1478         {
1479             if (promise != null)
1480                 promise.failed(x);
1481         }
1482     }
1483 }