View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.spdy;
20  
21  import java.io.IOException;
22  import java.net.InetSocketAddress;
23  import java.nio.ByteBuffer;
24  import java.nio.channels.InterruptedByTimeoutException;
25  import java.util.ArrayList;
26  import java.util.Collections;
27  import java.util.HashSet;
28  import java.util.LinkedList;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.Objects;
32  import java.util.Set;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ConcurrentMap;
35  import java.util.concurrent.CopyOnWriteArrayList;
36  import java.util.concurrent.ExecutionException;
37  import java.util.concurrent.Executor;
38  import java.util.concurrent.TimeUnit;
39  import java.util.concurrent.TimeoutException;
40  import java.util.concurrent.atomic.AtomicBoolean;
41  import java.util.concurrent.atomic.AtomicInteger;
42  
43  import org.eclipse.jetty.io.ByteBufferPool;
44  import org.eclipse.jetty.io.EndPoint;
45  import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
46  import org.eclipse.jetty.spdy.api.DataInfo;
47  import org.eclipse.jetty.spdy.api.GoAwayInfo;
48  import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
49  import org.eclipse.jetty.spdy.api.PingInfo;
50  import org.eclipse.jetty.spdy.api.PingResultInfo;
51  import org.eclipse.jetty.spdy.api.PushInfo;
52  import org.eclipse.jetty.spdy.api.RstInfo;
53  import org.eclipse.jetty.spdy.api.SPDYException;
54  import org.eclipse.jetty.spdy.api.Session;
55  import org.eclipse.jetty.spdy.api.SessionFrameListener;
56  import org.eclipse.jetty.spdy.api.SessionStatus;
57  import org.eclipse.jetty.spdy.api.Settings;
58  import org.eclipse.jetty.spdy.api.SettingsInfo;
59  import org.eclipse.jetty.spdy.api.Stream;
60  import org.eclipse.jetty.spdy.api.StreamFrameListener;
61  import org.eclipse.jetty.spdy.api.StreamStatus;
62  import org.eclipse.jetty.spdy.api.SynInfo;
63  import org.eclipse.jetty.spdy.frames.ControlFrame;
64  import org.eclipse.jetty.spdy.frames.ControlFrameType;
65  import org.eclipse.jetty.spdy.frames.CredentialFrame;
66  import org.eclipse.jetty.spdy.frames.DataFrame;
67  import org.eclipse.jetty.spdy.frames.GoAwayFrame;
68  import org.eclipse.jetty.spdy.frames.HeadersFrame;
69  import org.eclipse.jetty.spdy.frames.PingFrame;
70  import org.eclipse.jetty.spdy.frames.RstStreamFrame;
71  import org.eclipse.jetty.spdy.frames.SettingsFrame;
72  import org.eclipse.jetty.spdy.frames.SynReplyFrame;
73  import org.eclipse.jetty.spdy.frames.SynStreamFrame;
74  import org.eclipse.jetty.spdy.frames.WindowUpdateFrame;
75  import org.eclipse.jetty.spdy.generator.Generator;
76  import org.eclipse.jetty.spdy.parser.Parser;
77  import org.eclipse.jetty.util.Atomics;
78  import org.eclipse.jetty.util.BufferUtil;
79  import org.eclipse.jetty.util.Callback;
80  import org.eclipse.jetty.util.ForkInvoker;
81  import org.eclipse.jetty.util.FutureCallback;
82  import org.eclipse.jetty.util.FuturePromise;
83  import org.eclipse.jetty.util.Promise;
84  import org.eclipse.jetty.util.component.ContainerLifeCycle;
85  import org.eclipse.jetty.util.component.Dumpable;
86  import org.eclipse.jetty.util.log.Log;
87  import org.eclipse.jetty.util.log.Logger;
88  import org.eclipse.jetty.util.thread.Scheduler;
89  
90  public class StandardSession implements ISession, Parser.Listener, Dumpable
91  {
92      private static final Logger LOG = Log.getLogger(Session.class);
93  
94      private final ForkInvoker<Callback> invoker = new SessionInvoker();
95      private final Map<String, Object> attributes = new ConcurrentHashMap<>();
96      private final List<Listener> listeners = new CopyOnWriteArrayList<>();
97      private final ConcurrentMap<Integer, IStream> streams = new ConcurrentHashMap<>();
98      private final LinkedList<FrameBytes> queue = new LinkedList<>();
99      private final ByteBufferPool bufferPool;
100     private final Executor threadPool;
101     private final Scheduler scheduler;
102     private final short version;
103     private final Controller controller;
104     private final EndPoint endPoint;
105     private final IdleListener idleListener;
106     private final AtomicInteger streamIds;
107     private final AtomicInteger pingIds;
108     private final SessionFrameListener listener;
109     private final Generator generator;
110     private final AtomicBoolean goAwaySent = new AtomicBoolean();
111     private final AtomicBoolean goAwayReceived = new AtomicBoolean();
112     private final AtomicInteger lastStreamId = new AtomicInteger();
113     private final FlowControlStrategy flowControlStrategy;
114     private boolean flushing;
115     private Throwable failure;
116 
117     public StandardSession(short version, ByteBufferPool bufferPool, Executor threadPool, Scheduler scheduler,
118                            Controller controller, EndPoint endPoint, IdleListener idleListener, int initialStreamId,
119                            SessionFrameListener listener, Generator generator, FlowControlStrategy flowControlStrategy)
120     {
121         // TODO this should probably be an aggregate lifecycle
122         this.version = version;
123         this.bufferPool = bufferPool;
124         this.threadPool = threadPool;
125         this.scheduler = scheduler;
126         this.controller = controller;
127         this.endPoint = endPoint;
128         this.idleListener = idleListener;
129         this.streamIds = new AtomicInteger(initialStreamId);
130         this.pingIds = new AtomicInteger(initialStreamId);
131         this.listener = listener;
132         this.generator = generator;
133         this.flowControlStrategy = flowControlStrategy;
134     }
135 
136     @Override
137     public short getVersion()
138     {
139         return version;
140     }
141 
142     @Override
143     public void addListener(Listener listener)
144     {
145         listeners.add(listener);
146     }
147 
148     @Override
149     public void removeListener(Listener listener)
150     {
151         listeners.remove(listener);
152     }
153 
154     @Override
155     public Stream syn(SynInfo synInfo, StreamFrameListener listener) throws ExecutionException, InterruptedException, TimeoutException
156     {
157         FuturePromise<Stream> result = new FuturePromise<>();
158         syn(synInfo, listener, result);
159         if (synInfo.getTimeout() > 0)
160             return result.get(synInfo.getTimeout(), synInfo.getUnit());
161         else
162             return result.get();
163     }
164 
165     @Override
166     public void syn(SynInfo synInfo, StreamFrameListener listener, Promise<Stream> promise)
167     {
168         // Synchronization is necessary.
169         // SPEC v3, 2.3.1 requires that the stream creation be monotonically crescent
170         // so we cannot allow thread1 to create stream1 and thread2 create stream3 and
171         // have stream3 hit the network before stream1, not only to comply with the spec
172         // but also because the compression context for the headers would be wrong, as the
173         // frame with a compression history will come before the first compressed frame.
174         int associatedStreamId = 0;
175         if (synInfo instanceof PushSynInfo)
176             associatedStreamId = ((PushSynInfo)synInfo).getAssociatedStreamId();
177 
178         synchronized (this)
179         {
180             int streamId = streamIds.getAndAdd(2);
181             // TODO: for SPDYv3 we need to support the "slot" argument
182             SynStreamFrame synStream = new SynStreamFrame(version, synInfo.getFlags(), streamId, associatedStreamId, synInfo.getPriority(), (short)0, synInfo.getHeaders());
183             IStream stream = createStream(synStream, listener, true, promise);
184             generateAndEnqueueControlFrame(stream, synStream, synInfo.getTimeout(), synInfo.getUnit(), stream);
185         }
186         flush();
187     }
188 
189     @Override
190     public void rst(RstInfo rstInfo) throws InterruptedException, ExecutionException, TimeoutException
191     {
192         FutureCallback result = new FutureCallback();
193         rst(rstInfo, result);
194         if (rstInfo.getTimeout() > 0)
195             result.get(rstInfo.getTimeout(), rstInfo.getUnit());
196         else
197             result.get();
198     }
199 
200     @Override
201     public void rst(RstInfo rstInfo, Callback callback)
202     {
203         // SPEC v3, 2.2.2
204         if (goAwaySent.get())
205         {
206             complete(callback);
207         }
208         else
209         {
210             int streamId = rstInfo.getStreamId();
211             IStream stream = streams.get(streamId);
212             RstStreamFrame frame = new RstStreamFrame(version, streamId, rstInfo.getStreamStatus().getCode(version));
213             control(stream, frame, rstInfo.getTimeout(), rstInfo.getUnit(), callback);
214             if (stream != null)
215             {
216                 stream.process(frame);
217                 removeStream(stream);
218             }
219         }
220     }
221 
222     @Override
223     public void settings(SettingsInfo settingsInfo) throws ExecutionException, InterruptedException, TimeoutException
224     {
225         FutureCallback result = new FutureCallback();
226         settings(settingsInfo, result);
227         if (settingsInfo.getTimeout() > 0)
228             result.get(settingsInfo.getTimeout(), settingsInfo.getUnit());
229         else
230             result.get();
231     }
232 
233     @Override
234     public void settings(SettingsInfo settingsInfo, Callback callback)
235     {
236         SettingsFrame frame = new SettingsFrame(version, settingsInfo.getFlags(), settingsInfo.getSettings());
237         control(null, frame, settingsInfo.getTimeout(), settingsInfo.getUnit(), callback);
238     }
239 
240     @Override
241     public PingResultInfo ping(PingInfo pingInfo) throws ExecutionException, InterruptedException, TimeoutException
242     {
243         //TODO: find a better name for PingResultInfo
244         FuturePromise<PingResultInfo> result = new FuturePromise<>();
245         ping(pingInfo, result);
246         if (pingInfo.getTimeout() > 0)
247             return result.get(pingInfo.getTimeout(), pingInfo.getUnit());
248         else
249             return result.get();
250     }
251 
252     @Override
253     public void ping(PingInfo pingInfo, Promise<PingResultInfo> promise)
254     {
255         int pingId = pingIds.getAndAdd(2);
256         PingInfoCallback pingInfoCallback = new PingInfoCallback(pingId, promise);
257         PingFrame frame = new PingFrame(version, pingId);
258         control(null, frame, pingInfo.getTimeout(), pingInfo.getUnit(), pingInfoCallback);
259     }
260 
261     @Override
262     public void goAway(GoAwayInfo goAwayInfo) throws ExecutionException, InterruptedException, TimeoutException
263     {
264         goAway(goAwayInfo, SessionStatus.OK);
265     }
266 
267     private void goAway(GoAwayInfo goAwayInfo, SessionStatus sessionStatus) throws ExecutionException, InterruptedException, TimeoutException
268     {
269         FutureCallback result = new FutureCallback();
270         goAway(sessionStatus, goAwayInfo.getTimeout(), goAwayInfo.getUnit(), result);
271         if (goAwayInfo.getTimeout() > 0)
272             result.get(goAwayInfo.getTimeout(), goAwayInfo.getUnit());
273         else
274             result.get();
275     }
276 
277     @Override
278     public void goAway(GoAwayInfo goAwayInfo, Callback callback)
279     {
280         goAway(SessionStatus.OK, goAwayInfo.getTimeout(), goAwayInfo.getUnit(), callback);
281     }
282 
283     private void goAway(SessionStatus sessionStatus, long timeout, TimeUnit unit, Callback callback)
284     {
285         if (goAwaySent.compareAndSet(false, true))
286         {
287             if (!goAwayReceived.get())
288             {
289                 GoAwayFrame frame = new GoAwayFrame(version, lastStreamId.get(), sessionStatus.getCode());
290                 control(null, frame, timeout, unit, callback);
291                 return;
292             }
293         }
294         complete(callback);
295     }
296 
297     @Override
298     public Set<Stream> getStreams()
299     {
300         Set<Stream> result = new HashSet<>();
301         result.addAll(streams.values());
302         return result;
303     }
304 
305     @Override
306     public IStream getStream(int streamId)
307     {
308         return streams.get(streamId);
309     }
310 
311     @Override
312     public Object getAttribute(String key)
313     {
314         return attributes.get(key);
315     }
316 
317     @Override
318     public void setAttribute(String key, Object value)
319     {
320         attributes.put(key, value);
321     }
322 
323     @Override
324     public Object removeAttribute(String key)
325     {
326         return attributes.remove(key);
327     }
328 
329     @Override
330     public InetSocketAddress getLocalAddress()
331     {
332         return endPoint.getLocalAddress();
333     }
334 
335     @Override
336     public InetSocketAddress getRemoteAddress()
337     {
338         return endPoint.getRemoteAddress();
339     }
340 
341     @Override
342     public void onControlFrame(ControlFrame frame)
343     {
344         notifyIdle(idleListener, false);
345         try
346         {
347             LOG.debug("Processing {}", frame);
348 
349             if (goAwaySent.get())
350             {
351                 LOG.debug("Skipped processing of {}", frame);
352                 return;
353             }
354 
355             switch (frame.getType())
356             {
357                 case SYN_STREAM:
358                 {
359                     onSyn((SynStreamFrame)frame);
360                     break;
361                 }
362                 case SYN_REPLY:
363                 {
364                     onReply((SynReplyFrame)frame);
365                     break;
366                 }
367                 case RST_STREAM:
368                 {
369                     onRst((RstStreamFrame)frame);
370                     break;
371                 }
372                 case SETTINGS:
373                 {
374                     onSettings((SettingsFrame)frame);
375                     break;
376                 }
377                 case NOOP:
378                 {
379                     // Just ignore it
380                     break;
381                 }
382                 case PING:
383                 {
384                     onPing((PingFrame)frame);
385                     break;
386                 }
387                 case GO_AWAY:
388                 {
389                     onGoAway((GoAwayFrame)frame);
390                     break;
391                 }
392                 case HEADERS:
393                 {
394                     onHeaders((HeadersFrame)frame);
395                     break;
396                 }
397                 case WINDOW_UPDATE:
398                 {
399                     onWindowUpdate((WindowUpdateFrame)frame);
400                     break;
401                 }
402                 case CREDENTIAL:
403                 {
404                     onCredential((CredentialFrame)frame);
405                     break;
406                 }
407                 default:
408                 {
409                     throw new IllegalStateException();
410                 }
411             }
412         }
413         finally
414         {
415             notifyIdle(idleListener, true);
416         }
417     }
418 
419     @Override
420     public void onDataFrame(DataFrame frame, ByteBuffer data)
421     {
422         notifyIdle(idleListener, false);
423         try
424         {
425             LOG.debug("Processing {}, {} data bytes", frame, data.remaining());
426 
427             if (goAwaySent.get())
428             {
429                 LOG.debug("Skipped processing of {}", frame);
430                 return;
431             }
432 
433             int streamId = frame.getStreamId();
434             IStream stream = streams.get(streamId);
435             if (stream == null)
436             {
437                 RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM);
438                 LOG.debug("Unknown stream {}", rstInfo);
439                 rst(rstInfo, new Callback.Adapter());
440             }
441             else
442             {
443                 processData(stream, frame, data);
444             }
445         }
446         finally
447         {
448             notifyIdle(idleListener, true);
449         }
450     }
451 
452     private void notifyIdle(IdleListener listener, boolean idle)
453     {
454         if (listener != null)
455             listener.onIdle(idle);
456     }
457 
458     private void processData(final IStream stream, DataFrame frame, ByteBuffer data)
459     {
460         ByteBufferDataInfo dataInfo = new ByteBufferDataInfo(data, frame.isClose())
461         {
462             @Override
463             public void consume(int delta)
464             {
465                 super.consume(delta);
466                 flowControlStrategy.onDataConsumed(StandardSession.this, stream, this, delta);
467             }
468         };
469         flowControlStrategy.onDataReceived(this, stream, dataInfo);
470         stream.process(dataInfo);
471         if (stream.isClosed())
472             removeStream(stream);
473     }
474 
475     @Override
476     public void onStreamException(StreamException x)
477     {
478         notifyOnException(listener, x);
479         rst(new RstInfo(x.getStreamId(), x.getStreamStatus()), new Callback.Adapter());
480     }
481 
482     @Override
483     public void onSessionException(SessionException x)
484     {
485         Throwable cause = x.getCause();
486         notifyOnException(listener, cause == null ? x : cause);
487         goAway(x.getSessionStatus(), 0, TimeUnit.SECONDS, new Callback.Adapter());
488     }
489 
490     private void onSyn(SynStreamFrame frame)
491     {
492         IStream stream = createStream(frame, null, false, null);
493         if (stream != null)
494             processSyn(listener, stream, frame);
495     }
496 
497     private void processSyn(SessionFrameListener listener, IStream stream, SynStreamFrame frame)
498     {
499         stream.process(frame);
500         // Update the last stream id before calling the application (which may send a GO_AWAY)
501         updateLastStreamId(stream);
502         StreamFrameListener streamListener;
503         if (stream.isUnidirectional())
504         {
505             PushInfo pushInfo = new PushInfo(frame.getHeaders(), frame.isClose());
506             streamListener = notifyOnPush(stream.getAssociatedStream().getStreamFrameListener(), stream, pushInfo);
507         }
508         else
509         {
510             SynInfo synInfo = new SynInfo(frame.getHeaders(), frame.isClose(), frame.getPriority());
511             streamListener = notifyOnSyn(listener, stream, synInfo);
512         }
513         stream.setStreamFrameListener(streamListener);
514         flush();
515         // The onSyn() listener may have sent a frame that closed the stream
516         if (stream.isClosed())
517             removeStream(stream);
518     }
519 
520     private IStream createStream(SynStreamFrame frame, StreamFrameListener listener, boolean local, Promise<Stream> promise)
521     {
522         IStream associatedStream = streams.get(frame.getAssociatedStreamId());
523         IStream stream = new StandardStream(frame.getStreamId(), frame.getPriority(), this, associatedStream, promise);
524         flowControlStrategy.onNewStream(this, stream);
525 
526         stream.updateCloseState(frame.isClose(), local);
527         stream.setStreamFrameListener(listener);
528 
529         if (stream.isUnidirectional())
530         {
531             // Unidirectional streams are implicitly half closed
532             stream.updateCloseState(true, !local);
533             if (!stream.isClosed())
534                 stream.getAssociatedStream().associate(stream);
535         }
536 
537         int streamId = stream.getId();
538         if (streams.putIfAbsent(streamId, stream) != null)
539         {
540             if (local)
541                 throw new IllegalStateException("Duplicate stream id " + streamId);
542             RstInfo rstInfo = new RstInfo(streamId, StreamStatus.PROTOCOL_ERROR);
543             LOG.debug("Duplicate stream, {}", rstInfo);
544             try
545             {
546                 rst(rstInfo);
547             }
548             catch (InterruptedException | ExecutionException | TimeoutException e)
549             {
550                 e.printStackTrace(); // TODO: really catch???
551             }
552             return null;
553         }
554         else
555         {
556             LOG.debug("Created {}", stream);
557             if (local)
558                 notifyStreamCreated(stream);
559             return stream;
560         }
561     }
562 
563     private void notifyStreamCreated(IStream stream)
564     {
565         for (Listener listener : listeners)
566         {
567             if (listener instanceof StreamListener)
568             {
569                 try
570                 {
571                     ((StreamListener)listener).onStreamCreated(stream);
572                 }
573                 catch (Exception x)
574                 {
575                     LOG.info("Exception while notifying listener " + listener, x);
576                 }
577                 catch (Error x)
578                 {
579                     LOG.info("Exception while notifying listener " + listener, x);
580                     throw x;
581                 }
582             }
583         }
584     }
585 
586     private void removeStream(IStream stream)
587     {
588         if (stream.isUnidirectional())
589             stream.getAssociatedStream().disassociate(stream);
590 
591         IStream removed = streams.remove(stream.getId());
592         if (removed != null)
593             assert removed == stream;
594 
595         LOG.debug("Removed {}", stream);
596         notifyStreamClosed(stream);
597     }
598 
599     private void notifyStreamClosed(IStream stream)
600     {
601         for (Listener listener : listeners)
602         {
603             if (listener instanceof StreamListener)
604             {
605                 try
606                 {
607                     ((StreamListener)listener).onStreamClosed(stream);
608                 }
609                 catch (Exception x)
610                 {
611                     LOG.info("Exception while notifying listener " + listener, x);
612                 }
613                 catch (Error x)
614                 {
615                     LOG.info("Exception while notifying listener " + listener, x);
616                     throw x;
617                 }
618             }
619         }
620     }
621 
622     private void onReply(SynReplyFrame frame)
623     {
624         int streamId = frame.getStreamId();
625         IStream stream = streams.get(streamId);
626         if (stream == null)
627         {
628             RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM);
629             LOG.debug("Unknown stream {}", rstInfo);
630             rst(rstInfo, new Callback.Adapter());
631         }
632         else
633         {
634             processReply(stream, frame);
635         }
636     }
637 
638     private void processReply(IStream stream, SynReplyFrame frame)
639     {
640         stream.process(frame);
641         if (stream.isClosed())
642             removeStream(stream);
643     }
644 
645     private void onRst(RstStreamFrame frame)
646     {
647         IStream stream = streams.get(frame.getStreamId());
648 
649         if (stream != null)
650             stream.process(frame);
651 
652         RstInfo rstInfo = new RstInfo(frame.getStreamId(), StreamStatus.from(frame.getVersion(), frame.getStatusCode()));
653         notifyOnRst(listener, rstInfo);
654         flush();
655 
656         if (stream != null)
657             removeStream(stream);
658     }
659 
660     private void onSettings(SettingsFrame frame)
661     {
662         Settings.Setting windowSizeSetting = frame.getSettings().get(Settings.ID.INITIAL_WINDOW_SIZE);
663         if (windowSizeSetting != null)
664         {
665             int windowSize = windowSizeSetting.value();
666             setWindowSize(windowSize);
667             LOG.debug("Updated session window size to {}", windowSize);
668         }
669         SettingsInfo settingsInfo = new SettingsInfo(frame.getSettings(), frame.isClearPersisted());
670         notifyOnSettings(listener, settingsInfo);
671         flush();
672     }
673 
674     private void onPing(PingFrame frame)
675     {
676         int pingId = frame.getPingId();
677         if (pingId % 2 == pingIds.get() % 2)
678         {
679             PingResultInfo pingResultInfo = new PingResultInfo(frame.getPingId());
680             notifyOnPing(listener, pingResultInfo);
681             flush();
682         }
683         else
684         {
685             control(null, frame, 0, TimeUnit.MILLISECONDS, new Callback.Adapter());
686         }
687     }
688 
689     private void onGoAway(GoAwayFrame frame)
690     {
691         if (goAwayReceived.compareAndSet(false, true))
692         {
693             //TODO: Find a better name for GoAwayResultInfo
694             GoAwayResultInfo goAwayResultInfo = new GoAwayResultInfo(frame.getLastStreamId(), SessionStatus.from(frame.getStatusCode()));
695             notifyOnGoAway(listener, goAwayResultInfo);
696             flush();
697             // SPDY does not require to send back a response to a GO_AWAY.
698             // We notified the application of the last good stream id and
699             // tried our best to flush remaining data.
700         }
701     }
702 
703     private void onHeaders(HeadersFrame frame)
704     {
705         int streamId = frame.getStreamId();
706         IStream stream = streams.get(streamId);
707         if (stream == null)
708         {
709             RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM);
710             LOG.debug("Unknown stream, {}", rstInfo);
711             rst(rstInfo, new Callback.Adapter());
712         }
713         else
714         {
715             processHeaders(stream, frame);
716         }
717     }
718 
719     private void processHeaders(IStream stream, HeadersFrame frame)
720     {
721         stream.process(frame);
722         if (stream.isClosed())
723             removeStream(stream);
724     }
725 
726     private void onWindowUpdate(WindowUpdateFrame frame)
727     {
728         int streamId = frame.getStreamId();
729         IStream stream = streams.get(streamId);
730         flowControlStrategy.onWindowUpdate(this, stream, frame.getWindowDelta());
731         flush();
732     }
733 
734     private void onCredential(CredentialFrame frame)
735     {
736         LOG.warn("{} frame not yet supported", frame.getType());
737         flush();
738     }
739 
740     protected void close()
741     {
742         // Check for null to support tests
743         if (controller != null)
744             controller.close(false);
745     }
746 
747     private void notifyOnException(SessionFrameListener listener, Throwable x)
748     {
749         try
750         {
751             if (listener != null)
752             {
753                 LOG.debug("Invoking callback with {} on listener {}", x, listener);
754                 listener.onException(x);
755             }
756         }
757         catch (Exception xx)
758         {
759             LOG.info("Exception while notifying listener " + listener, xx);
760         }
761         catch (Error xx)
762         {
763             LOG.info("Exception while notifying listener " + listener, xx);
764             throw xx;
765         }
766     }
767 
768     private StreamFrameListener notifyOnPush(StreamFrameListener listener, Stream stream, PushInfo pushInfo)
769     {
770         try
771         {
772             if (listener == null)
773                 return null;
774             LOG.debug("Invoking callback with {} on listener {}", pushInfo, listener);
775             return listener.onPush(stream, pushInfo);
776         }
777         catch (Exception x)
778         {
779             LOG.info("Exception while notifying listener " + listener, x);
780             return null;
781         }
782         catch (Error x)
783         {
784             LOG.info("Exception while notifying listener " + listener, x);
785             throw x;
786         }
787     }
788 
789     private StreamFrameListener notifyOnSyn(SessionFrameListener listener, Stream stream, SynInfo synInfo)
790     {
791         try
792         {
793             if (listener == null)
794                 return null;
795             LOG.debug("Invoking callback with {} on listener {}", synInfo, listener);
796             return listener.onSyn(stream, synInfo);
797         }
798         catch (Exception x)
799         {
800             LOG.info("Exception while notifying listener " + listener, x);
801             return null;
802         }
803         catch (Error x)
804         {
805             LOG.info("Exception while notifying listener " + listener, x);
806             throw x;
807         }
808     }
809 
810     private void notifyOnRst(SessionFrameListener listener, RstInfo rstInfo)
811     {
812         try
813         {
814             if (listener != null)
815             {
816                 LOG.debug("Invoking callback with {} on listener {}", rstInfo, listener);
817                 listener.onRst(this, rstInfo);
818             }
819         }
820         catch (Exception x)
821         {
822             LOG.info("Exception while notifying listener " + listener, x);
823         }
824         catch (Error x)
825         {
826             LOG.info("Exception while notifying listener " + listener, x);
827             throw x;
828         }
829     }
830 
831     private void notifyOnSettings(SessionFrameListener listener, SettingsInfo settingsInfo)
832     {
833         try
834         {
835             if (listener != null)
836             {
837                 LOG.debug("Invoking callback with {} on listener {}", settingsInfo, listener);
838                 listener.onSettings(this, settingsInfo);
839             }
840         }
841         catch (Exception x)
842         {
843             LOG.info("Exception while notifying listener " + listener, x);
844         }
845         catch (Error x)
846         {
847             LOG.info("Exception while notifying listener " + listener, x);
848             throw x;
849         }
850     }
851 
852     private void notifyOnPing(SessionFrameListener listener, PingResultInfo pingResultInfo)
853     {
854         try
855         {
856             if (listener != null)
857             {
858                 LOG.debug("Invoking callback with {} on listener {}", pingResultInfo, listener);
859                 listener.onPing(this, pingResultInfo);
860             }
861         }
862         catch (Exception x)
863         {
864             LOG.info("Exception while notifying listener " + listener, x);
865         }
866         catch (Error x)
867         {
868             LOG.info("Exception while notifying listener " + listener, x);
869             throw x;
870         }
871     }
872 
873     private void notifyOnGoAway(SessionFrameListener listener, GoAwayResultInfo goAwayResultInfo)
874     {
875         try
876         {
877             if (listener != null)
878             {
879                 LOG.debug("Invoking callback with {} on listener {}", goAwayResultInfo, listener);
880                 listener.onGoAway(this, goAwayResultInfo);
881             }
882         }
883         catch (Exception x)
884         {
885             LOG.info("Exception while notifying listener " + listener, x);
886         }
887         catch (Error x)
888         {
889             LOG.info("Exception while notifying listener " + listener, x);
890             throw x;
891         }
892     }
893 
894 
895     @Override
896     public void control(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Callback callback)
897     {
898         generateAndEnqueueControlFrame(stream, frame, timeout, unit, callback);
899         flush();
900     }
901 
902     private void generateAndEnqueueControlFrame(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Callback callback)
903     {
904         try
905         {
906             // Synchronization is necessary, since we may have concurrent replies
907             // and those needs to be generated and enqueued atomically in order
908             // to maintain a correct compression context
909             synchronized (this)
910             {
911                 ByteBuffer buffer = generator.control(frame);
912                 LOG.debug("Queuing {} on {}", frame, stream);
913                 ControlFrameBytes frameBytes = new ControlFrameBytes(stream, callback, frame, buffer);
914                 if (timeout > 0)
915                     frameBytes.task = scheduler.schedule(frameBytes, timeout, unit);
916 
917                 // Special handling for PING frames, they must be sent as soon as possible
918                 if (ControlFrameType.PING == frame.getType())
919                     prepend(frameBytes);
920                 else
921                     append(frameBytes);
922             }
923         }
924         catch (Exception x)
925         {
926             notifyCallbackFailed(callback, x);
927         }
928     }
929 
930     private void updateLastStreamId(IStream stream)
931     {
932         int streamId = stream.getId();
933         if (streamId % 2 != streamIds.get() % 2)
934             Atomics.updateMax(lastStreamId, streamId);
935     }
936 
937     @Override
938     public void data(IStream stream, DataInfo dataInfo, long timeout, TimeUnit unit, Callback callback)
939     {
940         LOG.debug("Queuing {} on {}", dataInfo, stream);
941         DataFrameBytes frameBytes = new DataFrameBytes(stream, callback, dataInfo);
942         if (timeout > 0)
943             frameBytes.task = scheduler.schedule(frameBytes, timeout, unit);
944         append(frameBytes);
945         flush();
946     }
947 
948     @Override
949     public void shutdown()
950     {
951         FrameBytes frameBytes = new CloseFrameBytes();
952         append(frameBytes);
953         flush();
954     }
955 
956     private void execute(Runnable task)
957     {
958         threadPool.execute(task);
959     }
960 
961     @Override
962     public void flush()
963     {
964         FrameBytes frameBytes = null;
965         ByteBuffer buffer = null;
966         synchronized (queue)
967         {
968             if (flushing || queue.isEmpty())
969                 return;
970 
971             Set<IStream> stalledStreams = null;
972             for (int i = 0; i < queue.size(); ++i)
973             {
974                 frameBytes = queue.get(i);
975 
976                 IStream stream = frameBytes.getStream();
977                 if (stream != null && stalledStreams != null && stalledStreams.contains(stream))
978                     continue;
979 
980                 buffer = frameBytes.getByteBuffer();
981                 if (buffer != null)
982                 {
983                     queue.remove(i);
984                     if (stream != null && stream.isReset() && !(frameBytes instanceof ControlFrameBytes))
985                     {
986                         frameBytes.fail(new StreamException(stream.getId(), StreamStatus.INVALID_STREAM,
987                                 "Stream: " + stream + " is reset!"));
988                         return;
989                     }
990                     break;
991                 }
992 
993                 if (stalledStreams == null)
994                     stalledStreams = new HashSet<>();
995                 if (stream != null)
996                     stalledStreams.add(stream);
997 
998                 LOG.debug("Flush stalled for {}, {} frame(s) in queue", frameBytes, queue.size());
999             }
1000 
1001             if (buffer == null)
1002                 return;
1003 
1004             flushing = true;
1005             LOG.debug("Flushing {}, {} frame(s) in queue", frameBytes, queue.size());
1006         }
1007         write(buffer, frameBytes);
1008     }
1009 
1010     private void append(FrameBytes frameBytes)
1011     {
1012         Throwable failure;
1013         synchronized (queue)
1014         {
1015             failure = this.failure;
1016             if (failure == null)
1017             {
1018                 // Frames containing headers must be send in the order the headers have been generated. We don't need
1019                 // to do this check in StandardSession.prepend() as no frames containing headers will be prepended.
1020                 if (frameBytes instanceof ControlFrameBytes)
1021                     queue.addLast(frameBytes);
1022                 else
1023                 {
1024                     int index = queue.size();
1025                     while (index > 0)
1026                     {
1027                         FrameBytes element = queue.get(index - 1);
1028                         if (element.compareTo(frameBytes) >= 0)
1029                             break;
1030                         --index;
1031                     }
1032                     queue.add(index, frameBytes);
1033                 }
1034             }
1035         }
1036 
1037         if (failure != null)
1038             frameBytes.fail(new SPDYException(failure));
1039     }
1040 
1041     private void prepend(FrameBytes frameBytes)
1042     {
1043         Throwable failure;
1044         synchronized (queue)
1045         {
1046             failure = this.failure;
1047             if (failure == null)
1048             {
1049                 int index = 0;
1050                 while (index < queue.size())
1051                 {
1052                     FrameBytes element = queue.get(index);
1053                     if (element.compareTo(frameBytes) <= 0)
1054                         break;
1055                     ++index;
1056                 }
1057                 queue.add(index, frameBytes);
1058             }
1059         }
1060 
1061         if (failure != null)
1062             frameBytes.fail(new SPDYException(failure));
1063     }
1064 
1065     protected void write(ByteBuffer buffer, Callback callback)
1066     {
1067         if (controller != null)
1068         {
1069             LOG.debug("Writing {} frame bytes of {}", buffer.remaining(), buffer.limit());
1070             controller.write(buffer, callback);
1071         }
1072     }
1073 
1074     private void complete(final Callback callback)
1075     {
1076         // Applications may send and queue up a lot of frames and
1077         // if we call Callback.completed() only synchronously we risk
1078         // starvation (for the last frames sent) and stack overflow.
1079         // Therefore every some invocation, we dispatch to a new thread
1080         invoker.invoke(callback);
1081     }
1082 
1083     private void notifyCallbackFailed(Callback callback, Throwable x)
1084     {
1085         try
1086         {
1087             if (callback != null)
1088                 callback.failed(x);
1089         }
1090         catch (Exception xx)
1091         {
1092             LOG.info("Exception while notifying callback " + callback, xx);
1093         }
1094         catch (Error xx)
1095         {
1096             LOG.info("Exception while notifying callback " + callback, xx);
1097             throw xx;
1098         }
1099     }
1100 
1101     public int getWindowSize()
1102     {
1103         return flowControlStrategy.getWindowSize(this);
1104     }
1105 
1106     public void setWindowSize(int initialWindowSize)
1107     {
1108         flowControlStrategy.setWindowSize(this, initialWindowSize);
1109     }
1110 
1111     @Override
1112     public String toString()
1113     {
1114         return String.format("%s@%x{v%d,queueSize=%d,windowSize=%d,streams=%d}", getClass().getSimpleName(),
1115                 hashCode(), version, queue.size(), getWindowSize(), streams.size());
1116     }
1117 
1118     @Override
1119     public String dump()
1120     {
1121         return ContainerLifeCycle.dump(this);
1122     }
1123 
1124     @Override
1125     public void dump(Appendable out, String indent) throws IOException
1126     {
1127         ContainerLifeCycle.dumpObject(out, this);
1128         ContainerLifeCycle.dump(out, indent, Collections.singletonList(controller), streams.values());
1129     }
1130 
1131     private class SessionInvoker extends ForkInvoker<Callback>
1132     {
1133         private SessionInvoker()
1134         {
1135             super(4);
1136         }
1137 
1138         @Override
1139         public void fork(final Callback callback)
1140         {
1141             execute(new Runnable()
1142             {
1143                 @Override
1144                 public void run()
1145                 {
1146                     callback.succeeded();
1147                     flush();
1148                 }
1149             });
1150         }
1151 
1152         @Override
1153         public void call(Callback callback)
1154         {
1155             callback.succeeded();
1156             flush();
1157         }
1158     }
1159 
1160     public interface FrameBytes extends Comparable<FrameBytes>, Callback
1161     {
1162         public IStream getStream();
1163 
1164         public abstract ByteBuffer getByteBuffer();
1165 
1166         public abstract void complete();
1167 
1168         public abstract void fail(Throwable throwable);
1169     }
1170 
1171     private abstract class AbstractFrameBytes implements FrameBytes, Runnable
1172     {
1173         private final IStream stream;
1174         private final Callback callback;
1175         protected volatile Scheduler.Task task;
1176 
1177         protected AbstractFrameBytes(IStream stream, Callback callback)
1178         {
1179             this.stream = stream;
1180             this.callback = Objects.requireNonNull(callback);
1181         }
1182 
1183         @Override
1184         public IStream getStream()
1185         {
1186             return stream;
1187         }
1188 
1189         @Override
1190         public int compareTo(FrameBytes that)
1191         {
1192             // FrameBytes may have or not have a related stream (for example, PING do not have a related stream)
1193             // FrameBytes without related streams have higher priority
1194             IStream thisStream = getStream();
1195             IStream thatStream = that.getStream();
1196             if (thisStream == null)
1197                 return thatStream == null ? 0 : -1;
1198             if (thatStream == null)
1199                 return 1;
1200             // If this.stream.priority > that.stream.priority => this.stream has less priority than that.stream
1201             return thatStream.getPriority() - thisStream.getPriority();
1202         }
1203 
1204         @Override
1205         public void complete()
1206         {
1207             cancelTask();
1208             StandardSession.this.complete(callback);
1209         }
1210 
1211         @Override
1212         public void fail(Throwable x)
1213         {
1214             cancelTask();
1215             notifyCallbackFailed(callback, x);
1216             StandardSession.this.flush();
1217         }
1218 
1219         private void cancelTask()
1220         {
1221             Scheduler.Task task = this.task;
1222             if (task != null)
1223                 task.cancel();
1224         }
1225 
1226         @Override
1227         public void run()
1228         {
1229             close();
1230             fail(new InterruptedByTimeoutException());
1231         }
1232 
1233         @Override
1234         public void succeeded()
1235         {
1236             synchronized (queue)
1237             {
1238                 if (LOG.isDebugEnabled())
1239                     LOG.debug("Completed write of {}, {} frame(s) in queue", this, queue.size());
1240                 flushing = false;
1241             }
1242             complete();
1243         }
1244 
1245         @Override
1246         public void failed(Throwable x)
1247         {
1248             List<FrameBytes> frameBytesToFail = new ArrayList<>();
1249             frameBytesToFail.add(this);
1250 
1251             synchronized (queue)
1252             {
1253                 failure = x;
1254                 if (LOG.isDebugEnabled())
1255                 {
1256                     String logMessage = String.format("Failed write of %s, failing all %d frame(s) in queue", this, queue.size());
1257                     LOG.debug(logMessage, x);
1258                 }
1259                 frameBytesToFail.addAll(queue);
1260                 queue.clear();
1261                 flushing = false;
1262             }
1263 
1264             for (FrameBytes fb : frameBytesToFail)
1265                 fb.fail(x);
1266         }
1267     }
1268 
1269     private class ControlFrameBytes extends AbstractFrameBytes
1270     {
1271         private final ControlFrame frame;
1272         private final ByteBuffer buffer;
1273 
1274         private ControlFrameBytes(IStream stream, Callback callback, ControlFrame frame, ByteBuffer buffer)
1275         {
1276             super(stream, callback);
1277             this.frame = frame;
1278             this.buffer = buffer;
1279         }
1280 
1281         @Override
1282         public ByteBuffer getByteBuffer()
1283         {
1284             return buffer;
1285         }
1286 
1287         @Override
1288         public void complete()
1289         {
1290             bufferPool.release(buffer);
1291 
1292             super.complete();
1293 
1294             if (frame.getType() == ControlFrameType.GO_AWAY)
1295             {
1296                 // After sending a GO_AWAY we need to hard close the connection.
1297                 // Recipients will know the last good stream id and act accordingly.
1298                 close();
1299             }
1300             IStream stream = getStream();
1301             if (stream != null && stream.isClosed())
1302                 removeStream(stream);
1303         }
1304 
1305         @Override
1306         public String toString()
1307         {
1308             return frame.toString();
1309         }
1310     }
1311 
1312     private class DataFrameBytes extends AbstractFrameBytes
1313     {
1314         private final DataInfo dataInfo;
1315         private int size;
1316         private volatile ByteBuffer buffer;
1317 
1318         private DataFrameBytes(IStream stream, Callback handler, DataInfo dataInfo)
1319         {
1320             super(stream, handler);
1321             this.dataInfo = dataInfo;
1322         }
1323 
1324         @Override
1325         public ByteBuffer getByteBuffer()
1326         {
1327             try
1328             {
1329                 IStream stream = getStream();
1330                 int windowSize = stream.getWindowSize();
1331                 if (windowSize <= 0)
1332                     return null;
1333 
1334                 size = dataInfo.available();
1335                 if (size > windowSize)
1336                     size = windowSize;
1337 
1338                 buffer = generator.data(stream.getId(), size, dataInfo);
1339                 return buffer;
1340             }
1341             catch (Throwable x)
1342             {
1343                 fail(x);
1344                 return null;
1345             }
1346         }
1347 
1348         @Override
1349         public void complete()
1350         {
1351             bufferPool.release(buffer);
1352             IStream stream = getStream();
1353             flowControlStrategy.updateWindow(StandardSession.this, stream, -size);
1354             if (dataInfo.available() > 0)
1355             {
1356                 // We have written a frame out of this DataInfo, but there is more to write.
1357                 // We need to keep the correct ordering of frames, to avoid that another
1358                 // DataInfo for the same stream is written before this one is finished.
1359                 prepend(this);
1360                 flush();
1361             }
1362             else
1363             {
1364                 super.complete();
1365                 stream.updateCloseState(dataInfo.isClose(), true);
1366                 if (stream.isClosed())
1367                     removeStream(stream);
1368             }
1369         }
1370 
1371         @Override
1372         public String toString()
1373         {
1374             return String.format("DATA bytes @%x available=%d consumed=%d on %s", dataInfo.hashCode(), dataInfo.available(), dataInfo.consumed(), getStream());
1375         }
1376     }
1377 
1378     private class CloseFrameBytes extends AbstractFrameBytes
1379     {
1380         private CloseFrameBytes()
1381         {
1382             super(null, new Callback.Adapter());
1383         }
1384 
1385         @Override
1386         public ByteBuffer getByteBuffer()
1387         {
1388             return BufferUtil.EMPTY_BUFFER;
1389         }
1390 
1391         @Override
1392         public void complete()
1393         {
1394             super.complete();
1395             close();
1396         }
1397     }
1398 
1399     private static class PingInfoCallback extends PingResultInfo implements Callback
1400     {
1401         private final Promise<PingResultInfo> promise;
1402 
1403         public PingInfoCallback(int pingId, Promise<PingResultInfo> promise)
1404         {
1405             super(pingId);
1406             this.promise = promise;
1407         }
1408 
1409         @Override
1410         public void succeeded()
1411         {
1412             if (promise != null)
1413                 promise.succeeded(this);
1414         }
1415 
1416         @Override
1417         public void failed(Throwable x)
1418         {
1419             if (promise != null)
1420                 promise.failed(x);
1421         }
1422     }
1423 }