View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.spdy;
20  
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  import java.nio.channels.InterruptedByTimeoutException;
24  import java.util.ArrayList;
25  import java.util.Collections;
26  import java.util.HashSet;
27  import java.util.LinkedList;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Set;
31  import java.util.concurrent.ConcurrentHashMap;
32  import java.util.concurrent.ConcurrentMap;
33  import java.util.concurrent.CopyOnWriteArrayList;
34  import java.util.concurrent.Executor;
35  import java.util.concurrent.Future;
36  import java.util.concurrent.ScheduledExecutorService;
37  import java.util.concurrent.ScheduledFuture;
38  import java.util.concurrent.TimeUnit;
39  import java.util.concurrent.atomic.AtomicBoolean;
40  import java.util.concurrent.atomic.AtomicInteger;
41  
42  import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
43  import org.eclipse.jetty.spdy.api.DataInfo;
44  import org.eclipse.jetty.spdy.api.GoAwayInfo;
45  import org.eclipse.jetty.spdy.api.Handler;
46  import org.eclipse.jetty.spdy.api.PingInfo;
47  import org.eclipse.jetty.spdy.api.RstInfo;
48  import org.eclipse.jetty.spdy.api.SPDYException;
49  import org.eclipse.jetty.spdy.api.Session;
50  import org.eclipse.jetty.spdy.api.SessionFrameListener;
51  import org.eclipse.jetty.spdy.api.SessionStatus;
52  import org.eclipse.jetty.spdy.api.Settings;
53  import org.eclipse.jetty.spdy.api.SettingsInfo;
54  import org.eclipse.jetty.spdy.api.Stream;
55  import org.eclipse.jetty.spdy.api.StreamFrameListener;
56  import org.eclipse.jetty.spdy.api.StreamStatus;
57  import org.eclipse.jetty.spdy.api.SynInfo;
58  import org.eclipse.jetty.spdy.frames.ControlFrame;
59  import org.eclipse.jetty.spdy.frames.ControlFrameType;
60  import org.eclipse.jetty.spdy.frames.CredentialFrame;
61  import org.eclipse.jetty.spdy.frames.DataFrame;
62  import org.eclipse.jetty.spdy.frames.GoAwayFrame;
63  import org.eclipse.jetty.spdy.frames.HeadersFrame;
64  import org.eclipse.jetty.spdy.frames.PingFrame;
65  import org.eclipse.jetty.spdy.frames.RstStreamFrame;
66  import org.eclipse.jetty.spdy.frames.SettingsFrame;
67  import org.eclipse.jetty.spdy.frames.SynReplyFrame;
68  import org.eclipse.jetty.spdy.frames.SynStreamFrame;
69  import org.eclipse.jetty.spdy.frames.WindowUpdateFrame;
70  import org.eclipse.jetty.spdy.generator.Generator;
71  import org.eclipse.jetty.spdy.parser.Parser;
72  import org.eclipse.jetty.util.Atomics;
73  import org.eclipse.jetty.util.component.AggregateLifeCycle;
74  import org.eclipse.jetty.util.component.Dumpable;
75  import org.eclipse.jetty.util.log.Log;
76  import org.eclipse.jetty.util.log.Logger;
77  
78  public class StandardSession implements ISession, Parser.Listener, Handler<StandardSession.FrameBytes>, Dumpable
79  {
80      private static final Logger logger = Log.getLogger(Session.class);
81      private static final ThreadLocal<Integer> handlerInvocations = new ThreadLocal<Integer>()
82      {
83          @Override
84          protected Integer initialValue()
85          {
86              return 0;
87          }
88      };
89  
90      private final Map<String, Object> attributes = new ConcurrentHashMap<>();
91      private final List<Listener> listeners = new CopyOnWriteArrayList<>();
92      private final ConcurrentMap<Integer, IStream> streams = new ConcurrentHashMap<>();
93      private final LinkedList<FrameBytes> queue = new LinkedList<>();
94      private final ByteBufferPool bufferPool;
95      private final Executor threadPool;
96      private final ScheduledExecutorService scheduler;
97      private final short version;
98      private final Controller<FrameBytes> controller;
99      private final IdleListener idleListener;
100     private final AtomicInteger streamIds;
101     private final AtomicInteger pingIds;
102     private final SessionFrameListener listener;
103     private final Generator generator;
104     private final AtomicBoolean goAwaySent = new AtomicBoolean();
105     private final AtomicBoolean goAwayReceived = new AtomicBoolean();
106     private final AtomicInteger lastStreamId = new AtomicInteger();
107     private final FlowControlStrategy flowControlStrategy;
108     private boolean flushing;
109     private Throwable failure;
110 
111     public StandardSession(short version, ByteBufferPool bufferPool, Executor threadPool, ScheduledExecutorService scheduler,
112             Controller<FrameBytes> controller, IdleListener idleListener, int initialStreamId, SessionFrameListener listener,
113             Generator generator, FlowControlStrategy flowControlStrategy)
114     {
115         this.version = version;
116         this.bufferPool = bufferPool;
117         this.threadPool = threadPool;
118         this.scheduler = scheduler;
119         this.controller = controller;
120         this.idleListener = idleListener;
121         this.streamIds = new AtomicInteger(initialStreamId);
122         this.pingIds = new AtomicInteger(initialStreamId);
123         this.listener = listener;
124         this.generator = generator;
125         this.flowControlStrategy = flowControlStrategy;
126     }
127 
128     @Override
129     public short getVersion()
130     {
131         return version;
132     }
133 
134     @Override
135     public void addListener(Listener listener)
136     {
137         listeners.add(listener);
138     }
139 
140     @Override
141     public void removeListener(Listener listener)
142     {
143         listeners.remove(listener);
144     }
145 
146     @Override
147     public Future<Stream> syn(SynInfo synInfo, StreamFrameListener listener)
148     {
149         Promise<Stream> result = new Promise<>();
150         syn(synInfo,listener,0,TimeUnit.MILLISECONDS,result);
151         return result;
152     }
153 
154     @Override
155     public void syn(SynInfo synInfo, StreamFrameListener listener, long timeout, TimeUnit unit, Handler<Stream> handler)
156     {
157         // Synchronization is necessary.
158         // SPEC v3, 2.3.1 requires that the stream creation be monotonically crescent
159         // so we cannot allow thread1 to create stream1 and thread2 create stream3 and
160         // have stream3 hit the network before stream1, not only to comply with the spec
161         // but also because the compression context for the headers would be wrong, as the
162         // frame with a compression history will come before the first compressed frame.
163         int associatedStreamId = 0;
164         if (synInfo instanceof PushSynInfo)
165             associatedStreamId = ((PushSynInfo)synInfo).getAssociatedStreamId();
166 
167         synchronized (this)
168         {
169             int streamId = streamIds.getAndAdd(2);
170             // TODO: for SPDYv3 we need to support the "slot" argument
171             SynStreamFrame synStream = new SynStreamFrame(version, synInfo.getFlags(), streamId, associatedStreamId, synInfo.getPriority(), (short)0, synInfo.getHeaders());
172             IStream stream = createStream(synStream, listener, true);
173             generateAndEnqueueControlFrame(stream, synStream, timeout, unit, handler, stream);
174         }
175         flush();
176     }
177 
178     @Override
179     public Future<Void> rst(RstInfo rstInfo)
180     {
181         Promise<Void> result = new Promise<>();
182         rst(rstInfo,0,TimeUnit.MILLISECONDS,result);
183         return result;
184     }
185 
186     @Override
187     public void rst(RstInfo rstInfo, long timeout, TimeUnit unit, Handler<Void> handler)
188     {
189         // SPEC v3, 2.2.2
190         if (goAwaySent.get())
191         {
192             complete(handler,null);
193         }
194         else
195         {
196             int streamId = rstInfo.getStreamId();
197             IStream stream = streams.get(streamId);
198             RstStreamFrame frame = new RstStreamFrame(version,streamId,rstInfo.getStreamStatus().getCode(version));
199             control(stream,frame,timeout,unit,handler,null);
200             if (stream != null)
201             {
202                 stream.process(frame);
203                 removeStream(stream);
204             }
205         }
206     }
207 
208     @Override
209     public Future<Void> settings(SettingsInfo settingsInfo)
210     {
211         Promise<Void> result = new Promise<>();
212         settings(settingsInfo,0,TimeUnit.MILLISECONDS,result);
213         return result;
214     }
215 
216     @Override
217     public void settings(SettingsInfo settingsInfo, long timeout, TimeUnit unit, Handler<Void> handler)
218     {
219         SettingsFrame frame = new SettingsFrame(version,settingsInfo.getFlags(),settingsInfo.getSettings());
220         control(null, frame, timeout, unit, handler, null);
221     }
222 
223     @Override
224     public Future<PingInfo> ping()
225     {
226         Promise<PingInfo> result = new Promise<>();
227         ping(0, TimeUnit.MILLISECONDS, result);
228         return result;
229     }
230 
231     @Override
232     public void ping(long timeout, TimeUnit unit, Handler<PingInfo> handler)
233     {
234         int pingId = pingIds.getAndAdd(2);
235         PingInfo pingInfo = new PingInfo(pingId);
236         PingFrame frame = new PingFrame(version,pingId);
237         control(null,frame,timeout,unit,handler,pingInfo);
238     }
239 
240     @Override
241     public Future<Void> goAway()
242     {
243         return goAway(SessionStatus.OK);
244     }
245 
246     private Future<Void> goAway(SessionStatus sessionStatus)
247     {
248         Promise<Void> result = new Promise<>();
249         goAway(sessionStatus, 0, TimeUnit.MILLISECONDS, result);
250         return result;
251     }
252 
253     @Override
254     public void goAway(long timeout, TimeUnit unit, Handler<Void> handler)
255     {
256         goAway(SessionStatus.OK, timeout, unit, handler);
257     }
258 
259     private void goAway(SessionStatus sessionStatus, long timeout, TimeUnit unit, Handler<Void> handler)
260     {
261         if (goAwaySent.compareAndSet(false,true))
262         {
263             if (!goAwayReceived.get())
264             {
265                 GoAwayFrame frame = new GoAwayFrame(version,lastStreamId.get(),sessionStatus.getCode());
266                 control(null,frame,timeout,unit,handler,null);
267                 return;
268             }
269         }
270         complete(handler, null);
271     }
272 
273     @Override
274     public Set<Stream> getStreams()
275     {
276         Set<Stream> result = new HashSet<>();
277         result.addAll(streams.values());
278         return result;
279     }
280 
281     @Override
282     public IStream getStream(int streamId)
283     {
284         return streams.get(streamId);
285     }
286 
287     @Override
288     public Object getAttribute(String key)
289     {
290         return attributes.get(key);
291     }
292 
293     @Override
294     public void setAttribute(String key, Object value)
295     {
296         attributes.put(key, value);
297     }
298 
299     @Override
300     public Object removeAttribute(String key)
301     {
302         return attributes.remove(key);
303     }
304 
305     @Override
306     public void onControlFrame(ControlFrame frame)
307     {
308         notifyIdle(idleListener, false);
309         try
310         {
311             logger.debug("Processing {}", frame);
312 
313             if (goAwaySent.get())
314             {
315                 logger.debug("Skipped processing of {}", frame);
316                 return;
317             }
318 
319             switch (frame.getType())
320             {
321                 case SYN_STREAM:
322                 {
323                     onSyn((SynStreamFrame)frame);
324                     break;
325                 }
326                 case SYN_REPLY:
327                 {
328                     onReply((SynReplyFrame)frame);
329                     break;
330                 }
331                 case RST_STREAM:
332                 {
333                     onRst((RstStreamFrame)frame);
334                     break;
335                 }
336                 case SETTINGS:
337                 {
338                     onSettings((SettingsFrame)frame);
339                     break;
340                 }
341                 case NOOP:
342                 {
343                     // Just ignore it
344                     break;
345                 }
346                 case PING:
347                 {
348                     onPing((PingFrame)frame);
349                     break;
350                 }
351                 case GO_AWAY:
352                 {
353                     onGoAway((GoAwayFrame)frame);
354                     break;
355                 }
356                 case HEADERS:
357                 {
358                     onHeaders((HeadersFrame)frame);
359                     break;
360                 }
361                 case WINDOW_UPDATE:
362                 {
363                     onWindowUpdate((WindowUpdateFrame)frame);
364                     break;
365                 }
366                 case CREDENTIAL:
367                 {
368                     onCredential((CredentialFrame)frame);
369                     break;
370                 }
371                 default:
372                 {
373                     throw new IllegalStateException();
374                 }
375             }
376         }
377         finally
378         {
379             notifyIdle(idleListener, true);
380         }
381     }
382 
383     @Override
384     public void onDataFrame(DataFrame frame, ByteBuffer data)
385     {
386         notifyIdle(idleListener, false);
387         try
388         {
389             logger.debug("Processing {}, {} data bytes", frame, data.remaining());
390 
391             if (goAwaySent.get())
392             {
393                 logger.debug("Skipped processing of {}", frame);
394                 return;
395             }
396 
397             int streamId = frame.getStreamId();
398             IStream stream = streams.get(streamId);
399             if (stream == null)
400             {
401                 RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM);
402                 logger.debug("Unknown stream {}", rstInfo);
403                 rst(rstInfo);
404             }
405             else
406             {
407                 processData(stream, frame, data);
408             }
409         }
410         finally
411         {
412             notifyIdle(idleListener, true);
413         }
414     }
415 
416     private void notifyIdle(IdleListener listener, boolean idle)
417     {
418         if (listener != null)
419             listener.onIdle(idle);
420     }
421 
422     private void processData(final IStream stream, DataFrame frame, ByteBuffer data)
423     {
424         ByteBufferDataInfo dataInfo = new ByteBufferDataInfo(data, frame.isClose(), frame.isCompress())
425         {
426             @Override
427             public void consume(int delta)
428             {
429                 super.consume(delta);
430                 flowControlStrategy.onDataConsumed(StandardSession.this, stream, this, delta);
431             }
432         };
433         flowControlStrategy.onDataReceived(this, stream, dataInfo);
434         stream.process(dataInfo);
435         if (stream.isClosed())
436             removeStream(stream);
437     }
438 
439     @Override
440     public void onStreamException(StreamException x)
441     {
442         notifyOnException(listener, x);
443         rst(new RstInfo(x.getStreamId(),x.getStreamStatus()));
444     }
445 
446     @Override
447     public void onSessionException(SessionException x)
448     {
449         Throwable cause = x.getCause();
450         notifyOnException(listener,cause == null?x:cause);
451         goAway(x.getSessionStatus());
452     }
453 
454     private void onSyn(SynStreamFrame frame)
455     {
456         IStream stream = createStream(frame, null, false);
457         if (stream != null)
458             processSyn(listener, stream, frame);
459     }
460 
461     private void processSyn(SessionFrameListener listener, IStream stream, SynStreamFrame frame)
462     {
463         stream.process(frame);
464         // Update the last stream id before calling the application (which may send a GO_AWAY)
465         updateLastStreamId(stream);
466         SynInfo synInfo = new SynInfo(frame.getHeaders(),frame.isClose(),frame.getPriority());
467         StreamFrameListener streamListener = notifyOnSyn(listener,stream,synInfo);
468         stream.setStreamFrameListener(streamListener);
469         flush();
470         // The onSyn() listener may have sent a frame that closed the stream
471         if (stream.isClosed())
472             removeStream(stream);
473     }
474 
475     private IStream createStream(SynStreamFrame frame, StreamFrameListener listener, boolean local)
476     {
477         IStream stream = newStream(frame);
478         stream.updateCloseState(frame.isClose(), local);
479         stream.setStreamFrameListener(listener);
480 
481         if (stream.isUnidirectional())
482         {
483             // Unidirectional streams are implicitly half closed
484             stream.updateCloseState(true, !local);
485             if (!stream.isClosed())
486                 stream.getAssociatedStream().associate(stream);
487         }
488 
489         int streamId = stream.getId();
490         if (streams.putIfAbsent(streamId, stream) != null)
491         {
492             if (local)
493                 throw new IllegalStateException("Duplicate stream id " + streamId);
494             RstInfo rstInfo = new RstInfo(streamId, StreamStatus.PROTOCOL_ERROR);
495             logger.debug("Duplicate stream, {}", rstInfo);
496             rst(rstInfo);
497             return null;
498         }
499         else
500         {
501             logger.debug("Created {}", stream);
502             if (local)
503                 notifyStreamCreated(stream);
504             return stream;
505         }
506     }
507 
508     private IStream newStream(SynStreamFrame frame)
509     {
510         IStream associatedStream = streams.get(frame.getAssociatedStreamId());
511         IStream stream = new StandardStream(frame.getStreamId(), frame.getPriority(), this, associatedStream);
512         flowControlStrategy.onNewStream(this, stream);
513         return stream;
514     }
515 
516     private void notifyStreamCreated(IStream stream)
517     {
518         for (Listener listener : listeners)
519         {
520             if (listener instanceof StreamListener)
521             {
522                 try
523                 {
524                     ((StreamListener)listener).onStreamCreated(stream);
525                 }
526                 catch (Exception x)
527                 {
528                     logger.info("Exception while notifying listener " + listener, x);
529                 }
530                 catch (Error x)
531                 {
532                     logger.info("Exception while notifying listener " + listener, x);
533                     throw x;
534                 }
535             }
536         }
537     }
538 
539     private void removeStream(IStream stream)
540     {
541         if (stream.isUnidirectional())
542             stream.getAssociatedStream().disassociate(stream);
543 
544         IStream removed = streams.remove(stream.getId());
545         if (removed != null)
546             assert removed == stream;
547 
548         logger.debug("Removed {}", stream);
549         notifyStreamClosed(stream);
550     }
551 
552     private void notifyStreamClosed(IStream stream)
553     {
554         for (Listener listener : listeners)
555         {
556             if (listener instanceof StreamListener)
557             {
558                 try
559                 {
560                     ((StreamListener)listener).onStreamClosed(stream);
561                 }
562                 catch (Exception x)
563                 {
564                     logger.info("Exception while notifying listener " + listener, x);
565                 }
566                 catch (Error x)
567                 {
568                     logger.info("Exception while notifying listener " + listener, x);
569                     throw x;
570                 }
571             }
572         }
573     }
574 
575     private void onReply(SynReplyFrame frame)
576     {
577         int streamId = frame.getStreamId();
578         IStream stream = streams.get(streamId);
579         if (stream == null)
580         {
581             RstInfo rstInfo = new RstInfo(streamId,StreamStatus.INVALID_STREAM);
582             logger.debug("Unknown stream {}",rstInfo);
583             rst(rstInfo);
584         }
585         else
586         {
587             processReply(stream,frame);
588         }
589     }
590 
591     private void processReply(IStream stream, SynReplyFrame frame)
592     {
593         stream.process(frame);
594         if (stream.isClosed())
595             removeStream(stream);
596     }
597 
598     private void onRst(RstStreamFrame frame)
599     {
600         IStream stream = streams.get(frame.getStreamId());
601 
602         if (stream != null)
603             stream.process(frame);
604 
605         RstInfo rstInfo = new RstInfo(frame.getStreamId(),StreamStatus.from(frame.getVersion(),frame.getStatusCode()));
606         notifyOnRst(listener, rstInfo);
607         flush();
608 
609         if (stream != null)
610             removeStream(stream);
611     }
612 
613     private void onSettings(SettingsFrame frame)
614     {
615         Settings.Setting windowSizeSetting = frame.getSettings().get(Settings.ID.INITIAL_WINDOW_SIZE);
616         if (windowSizeSetting != null)
617         {
618             int windowSize = windowSizeSetting.value();
619             setWindowSize(windowSize);
620             logger.debug("Updated session window size to {}", windowSize);
621         }
622         SettingsInfo settingsInfo = new SettingsInfo(frame.getSettings(), frame.isClearPersisted());
623         notifyOnSettings(listener, settingsInfo);
624         flush();
625     }
626 
627     private void onPing(PingFrame frame)
628     {
629         int pingId = frame.getPingId();
630         if (pingId % 2 == pingIds.get() % 2)
631         {
632             PingInfo pingInfo = new PingInfo(frame.getPingId());
633             notifyOnPing(listener, pingInfo);
634             flush();
635         }
636         else
637         {
638             control(null, frame, 0, TimeUnit.MILLISECONDS, null, null);
639         }
640     }
641 
642     private void onGoAway(GoAwayFrame frame)
643     {
644         if (goAwayReceived.compareAndSet(false, true))
645         {
646             GoAwayInfo goAwayInfo = new GoAwayInfo(frame.getLastStreamId(),SessionStatus.from(frame.getStatusCode()));
647             notifyOnGoAway(listener,goAwayInfo);
648             flush();
649             // SPDY does not require to send back a response to a GO_AWAY.
650             // We notified the application of the last good stream id,
651             // tried our best to flush remaining data, and close.
652             close();
653         }
654     }
655 
656     private void onHeaders(HeadersFrame frame)
657     {
658         int streamId = frame.getStreamId();
659         IStream stream = streams.get(streamId);
660         if (stream == null)
661         {
662             RstInfo rstInfo = new RstInfo(streamId,StreamStatus.INVALID_STREAM);
663             logger.debug("Unknown stream, {}",rstInfo);
664             rst(rstInfo);
665         }
666         else
667         {
668             processHeaders(stream,frame);
669         }
670     }
671 
672     private void processHeaders(IStream stream, HeadersFrame frame)
673     {
674         stream.process(frame);
675         if (stream.isClosed())
676             removeStream(stream);
677     }
678 
679     private void onWindowUpdate(WindowUpdateFrame frame)
680     {
681         int streamId = frame.getStreamId();
682         IStream stream = streams.get(streamId);
683         flowControlStrategy.onWindowUpdate(this, stream, frame.getWindowDelta());
684         flush();
685     }
686 
687     private void onCredential(CredentialFrame frame)
688     {
689         logger.warn("{} frame not yet supported", ControlFrameType.CREDENTIAL);
690         flush();
691     }
692 
693     protected void close()
694     {
695         // Check for null to support tests
696         if (controller != null)
697             controller.close(false);
698     }
699 
700     private void notifyOnException(SessionFrameListener listener, Throwable x)
701     {
702         try
703         {
704             if (listener != null)
705             {
706                 logger.debug("Invoking callback with {} on listener {}",x,listener);
707                 listener.onException(x);
708             }
709         }
710         catch (Exception xx)
711         {
712             logger.info("Exception while notifying listener " + listener, xx);
713         }
714         catch (Error xx)
715         {
716             logger.info("Exception while notifying listener " + listener, xx);
717             throw xx;
718         }
719     }
720 
721     private StreamFrameListener notifyOnSyn(SessionFrameListener listener, Stream stream, SynInfo synInfo)
722     {
723         try
724         {
725             if (listener == null)
726                 return null;
727             logger.debug("Invoking callback with {} on listener {}",synInfo,listener);
728             return listener.onSyn(stream,synInfo);
729         }
730         catch (Exception x)
731         {
732             logger.info("Exception while notifying listener " + listener,x);
733             return null;
734         }
735         catch (Error x)
736         {
737             logger.info("Exception while notifying listener " + listener, x);
738             throw x;
739         }
740     }
741 
742     private void notifyOnRst(SessionFrameListener listener, RstInfo rstInfo)
743     {
744         try
745         {
746             if (listener != null)
747             {
748                 logger.debug("Invoking callback with {} on listener {}",rstInfo,listener);
749                 listener.onRst(this,rstInfo);
750             }
751         }
752         catch (Exception x)
753         {
754             logger.info("Exception while notifying listener " + listener, x);
755         }
756         catch (Error x)
757         {
758             logger.info("Exception while notifying listener " + listener, x);
759             throw x;
760         }
761     }
762 
763     private void notifyOnSettings(SessionFrameListener listener, SettingsInfo settingsInfo)
764     {
765         try
766         {
767             if (listener != null)
768             {
769                 logger.debug("Invoking callback with {} on listener {}",settingsInfo,listener);
770                 listener.onSettings(this, settingsInfo);
771             }
772         }
773         catch (Exception x)
774         {
775             logger.info("Exception while notifying listener " + listener, x);
776         }
777         catch (Error x)
778         {
779             logger.info("Exception while notifying listener " + listener, x);
780             throw x;
781         }
782     }
783 
784     private void notifyOnPing(SessionFrameListener listener, PingInfo pingInfo)
785     {
786         try
787         {
788             if (listener != null)
789             {
790                 logger.debug("Invoking callback with {} on listener {}",pingInfo,listener);
791                 listener.onPing(this, pingInfo);
792             }
793         }
794         catch (Exception x)
795         {
796             logger.info("Exception while notifying listener " + listener, x);
797         }
798         catch (Error x)
799         {
800             logger.info("Exception while notifying listener " + listener, x);
801             throw x;
802         }
803     }
804 
805     private void notifyOnGoAway(SessionFrameListener listener, GoAwayInfo goAwayInfo)
806     {
807         try
808         {
809             if (listener != null)
810             {
811                 logger.debug("Invoking callback with {} on listener {}",goAwayInfo,listener);
812                 listener.onGoAway(this, goAwayInfo);
813             }
814         }
815         catch (Exception x)
816         {
817             logger.info("Exception while notifying listener " + listener, x);
818         }
819         catch (Error x)
820         {
821             logger.info("Exception while notifying listener " + listener, x);
822             throw x;
823         }
824     }
825 
826     @Override
827     public <C> void control(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Handler<C> handler, C context)
828     {
829         generateAndEnqueueControlFrame(stream,frame,timeout,unit,handler,context);
830         flush();
831     }
832 
833     private <C> void generateAndEnqueueControlFrame(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Handler<C> handler, C context)
834     {
835         try
836         {
837             // Synchronization is necessary, since we may have concurrent replies
838             // and those needs to be generated and enqueued atomically in order
839             // to maintain a correct compression context
840             synchronized (this)
841             {
842                 ByteBuffer buffer = generator.control(frame);
843                 logger.debug("Queuing {} on {}", frame, stream);
844                 ControlFrameBytes<C> frameBytes = new ControlFrameBytes<>(stream, handler, context, frame, buffer);
845                 if (timeout > 0)
846                     frameBytes.task = scheduler.schedule(frameBytes, timeout, unit);
847 
848                 // Special handling for PING frames, they must be sent as soon as possible
849                 if (ControlFrameType.PING == frame.getType())
850                     prepend(frameBytes);
851                 else
852                     append(frameBytes);
853             }
854         }
855         catch (Exception x)
856         {
857             notifyHandlerFailed(handler, context, x);
858         }
859     }
860 
861     private void updateLastStreamId(IStream stream)
862     {
863         int streamId = stream.getId();
864         if (streamId % 2 != streamIds.get() % 2)
865             Atomics.updateMax(lastStreamId, streamId);
866     }
867 
868     @Override
869     public <C> void data(IStream stream, DataInfo dataInfo, long timeout, TimeUnit unit, Handler<C> handler, C context)
870     {
871         logger.debug("Queuing {} on {}",dataInfo,stream);
872         DataFrameBytes<C> frameBytes = new DataFrameBytes<>(stream,handler,context,dataInfo);
873         if (timeout > 0)
874             frameBytes.task = scheduler.schedule(frameBytes,timeout,unit);
875         append(frameBytes);
876         flush();
877     }
878 
879     private void execute(Runnable task)
880     {
881         threadPool.execute(task);
882     }
883 
884     @Override
885     public void flush()
886     {
887         FrameBytes frameBytes = null;
888         ByteBuffer buffer = null;
889         synchronized (queue)
890         {
891             if (flushing || queue.isEmpty())
892                 return;
893 
894             Set<IStream> stalledStreams = null;
895             for (int i = 0; i < queue.size(); ++i)
896             {
897                 frameBytes = queue.get(i);
898 
899                 IStream stream = frameBytes.getStream();
900                 if (stream != null && stalledStreams != null && stalledStreams.contains(stream))
901                     continue;
902 
903                 buffer = frameBytes.getByteBuffer();
904                 if (buffer != null)
905                 {
906                     queue.remove(i);
907                     if (stream != null && stream.isReset())
908                     {
909                         frameBytes.fail(new StreamException(stream.getId(),StreamStatus.INVALID_STREAM));
910                         return;
911                     }
912                     break;
913                 }
914 
915                 if (stalledStreams == null)
916                     stalledStreams = new HashSet<>();
917                 if (stream != null)
918                     stalledStreams.add(stream);
919 
920                 logger.debug("Flush stalled for {}, {} frame(s) in queue",frameBytes,queue.size());
921             }
922 
923             if (buffer == null)
924                 return;
925 
926             flushing = true;
927             logger.debug("Flushing {}, {} frame(s) in queue",frameBytes,queue.size());
928         }
929         write(buffer,this,frameBytes);
930     }
931 
932     private void append(FrameBytes frameBytes)
933     {
934         Throwable failure;
935         synchronized (queue)
936         {
937             failure = this.failure;
938             if (failure == null)
939             {
940                 int index = queue.size();
941                 while (index > 0)
942                 {
943                     FrameBytes element = queue.get(index - 1);
944                     if (element.compareTo(frameBytes) >= 0)
945                         break;
946                     --index;
947                 }
948                 queue.add(index,frameBytes);
949             }
950         }
951 
952         if (failure != null)
953             frameBytes.fail(new SPDYException(failure));
954     }
955 
956     private void prepend(FrameBytes frameBytes)
957     {
958         Throwable failure;
959         synchronized (queue)
960         {
961             failure = this.failure;
962             if (failure == null)
963             {
964                 int index = 0;
965                 while (index < queue.size())
966                 {
967                     FrameBytes element = queue.get(index);
968                     if (element.compareTo(frameBytes) <= 0)
969                         break;
970                     ++index;
971                 }
972                 queue.add(index,frameBytes);
973             }
974         }
975 
976         if (failure != null)
977             frameBytes.fail(new SPDYException(failure));
978     }
979 
980     @Override
981     public void completed(FrameBytes frameBytes)
982     {
983         synchronized (queue)
984         {
985             logger.debug("Completed write of {}, {} frame(s) in queue",frameBytes,queue.size());
986             flushing = false;
987         }
988         frameBytes.complete();
989     }
990 
991     @Override
992     public void failed(FrameBytes frameBytes, Throwable x)
993     {
994         List<FrameBytes> frameBytesToFail = new ArrayList<>();
995         frameBytesToFail.add(frameBytes);
996 
997         synchronized (queue)
998         {
999             failure = x;
1000             String logMessage = String.format("Failed write of %s, failing all %d frame(s) in queue",frameBytes,queue.size());
1001             logger.debug(logMessage,x);
1002             frameBytesToFail.addAll(queue);
1003             queue.clear();
1004             flushing = false;
1005         }
1006 
1007         for (FrameBytes fb : frameBytesToFail)
1008             fb.fail(x);
1009     }
1010 
1011     protected void write(ByteBuffer buffer, Handler<FrameBytes> handler, FrameBytes frameBytes)
1012     {
1013         if (controller != null)
1014         {
1015             logger.debug("Writing {} frame bytes of {}",buffer.remaining(),frameBytes);
1016             controller.write(buffer,handler,frameBytes);
1017         }
1018     }
1019 
1020     private <C> void complete(final Handler<C> handler, final C context)
1021     {
1022         // Applications may send and queue up a lot of frames and
1023         // if we call Handler.completed() only synchronously we risk
1024         // starvation (for the last frames sent) and stack overflow.
1025         // Therefore every some invocation, we dispatch to a new thread
1026         Integer invocations = handlerInvocations.get();
1027         if (invocations >= 4)
1028         {
1029             execute(new Runnable()
1030             {
1031                 @Override
1032                 public void run()
1033                 {
1034                     if (handler != null)
1035                         notifyHandlerCompleted(handler,context);
1036                     flush();
1037                 }
1038             });
1039         }
1040         else
1041         {
1042             handlerInvocations.set(invocations + 1);
1043             try
1044             {
1045                 if (handler != null)
1046                     notifyHandlerCompleted(handler,context);
1047                 flush();
1048             }
1049             finally
1050             {
1051                 handlerInvocations.set(invocations);
1052             }
1053         }
1054     }
1055 
1056     private <C> void notifyHandlerCompleted(Handler<C> handler, C context)
1057     {
1058         try
1059         {
1060             handler.completed(context);
1061         }
1062         catch (Exception x)
1063         {
1064             logger.info("Exception while notifying handler " + handler, x);
1065         }
1066         catch (Error x)
1067         {
1068             logger.info("Exception while notifying handler " + handler, x);
1069             throw x;
1070         }
1071     }
1072 
1073     private <C> void notifyHandlerFailed(Handler<C> handler, C context, Throwable x)
1074     {
1075         try
1076         {
1077             if (handler != null)
1078                 handler.failed(context, x);
1079         }
1080         catch (Exception xx)
1081         {
1082             logger.info("Exception while notifying handler " + handler, xx);
1083         }
1084         catch (Error xx)
1085         {
1086             logger.info("Exception while notifying handler " + handler, xx);
1087             throw xx;
1088         }
1089     }
1090 
1091     public int getWindowSize()
1092     {
1093         return flowControlStrategy.getWindowSize(this);
1094     }
1095 
1096     public void setWindowSize(int initialWindowSize)
1097     {
1098         flowControlStrategy.setWindowSize(this, initialWindowSize);
1099     }
1100 
1101     public String toString()
1102     {
1103         return String.format("%s@%x{v%d,queuSize=%d,windowSize=%d,streams=%d}", getClass().getSimpleName(), hashCode(), version, queue.size(), getWindowSize(), streams.size());
1104     }
1105     
1106     
1107     @Override
1108     public String dump()
1109     {
1110         return AggregateLifeCycle.dump(this);
1111     }
1112 
1113     @Override
1114     public void dump(Appendable out, String indent) throws IOException
1115     {
1116         AggregateLifeCycle.dumpObject(out,this);
1117         AggregateLifeCycle.dump(out,indent,Collections.singletonList(controller),streams.values());
1118     }
1119 
1120 
1121 
1122     public interface FrameBytes extends Comparable<FrameBytes>
1123     {
1124         public IStream getStream();
1125 
1126         public abstract ByteBuffer getByteBuffer();
1127 
1128         public abstract void complete();
1129 
1130         public abstract void fail(Throwable throwable);
1131     }
1132 
1133     private abstract class AbstractFrameBytes<C> implements FrameBytes, Runnable
1134     {
1135         private final IStream stream;
1136         private final Handler<C> handler;
1137         private final C context;
1138         protected volatile ScheduledFuture<?> task;
1139 
1140         protected AbstractFrameBytes(IStream stream, Handler<C> handler, C context)
1141         {
1142             this.stream = stream;
1143             this.handler = handler;
1144             this.context = context;
1145         }
1146 
1147         @Override
1148         public IStream getStream()
1149         {
1150             return stream;
1151         }
1152 
1153         @Override
1154         public int compareTo(FrameBytes that)
1155         {
1156             // FrameBytes may have or not have a related stream (for example, PING do not have a related stream)
1157             // FrameBytes without related streams have higher priority
1158             IStream thisStream = getStream();
1159             IStream thatStream = that.getStream();
1160             if (thisStream == null)
1161                 return thatStream == null ? 0 : -1;
1162             if (thatStream == null)
1163                 return 1;
1164             // If this.stream.priority > that.stream.priority => this.stream has less priority than that.stream
1165             return thatStream.getPriority() - thisStream.getPriority();
1166         }
1167 
1168         @Override
1169         public void complete()
1170         {
1171             cancelTask();
1172             StandardSession.this.complete(handler,context);
1173         }
1174 
1175         @Override
1176         public void fail(Throwable x)
1177         {
1178             cancelTask();
1179             notifyHandlerFailed(handler,context,x);
1180             StandardSession.this.flush();
1181         }
1182 
1183         private void cancelTask()
1184         {
1185             ScheduledFuture<?> task = this.task;
1186             if (task != null)
1187                 task.cancel(false);
1188         }
1189 
1190         @Override
1191         public void run()
1192         {
1193             close();
1194             fail(new InterruptedByTimeoutException());
1195         }
1196     }
1197 
1198     private class ControlFrameBytes<C> extends AbstractFrameBytes<C>
1199     {
1200         private final ControlFrame frame;
1201         private final ByteBuffer buffer;
1202 
1203         private ControlFrameBytes(IStream stream, Handler<C> handler, C context, ControlFrame frame, ByteBuffer buffer)
1204         {
1205             super(stream,handler,context);
1206             this.frame = frame;
1207             this.buffer = buffer;
1208         }
1209 
1210         @Override
1211         public ByteBuffer getByteBuffer()
1212         {
1213             return buffer;
1214         }
1215 
1216         @Override
1217         public void complete()
1218         {
1219             bufferPool.release(buffer);
1220 
1221             super.complete();
1222 
1223             if (frame.getType() == ControlFrameType.GO_AWAY)
1224             {
1225                 // After sending a GO_AWAY we need to hard close the connection.
1226                 // Recipients will know the last good stream id and act accordingly.
1227                 close();
1228             }
1229             IStream stream = getStream();
1230             if (stream != null && stream.isClosed())
1231                 removeStream(stream);
1232         }
1233 
1234         @Override
1235         public String toString()
1236         {
1237             return frame.toString();
1238         }
1239     }
1240 
1241     private class DataFrameBytes<C> extends AbstractFrameBytes<C>
1242     {
1243         private final DataInfo dataInfo;
1244         private int size;
1245         private volatile ByteBuffer buffer;
1246 
1247         private DataFrameBytes(IStream stream, Handler<C> handler, C context, DataInfo dataInfo)
1248         {
1249             super(stream,handler,context);
1250             this.dataInfo = dataInfo;
1251         }
1252 
1253         @Override
1254         public ByteBuffer getByteBuffer()
1255         {
1256             try
1257             {
1258                 IStream stream = getStream();
1259                 int windowSize = stream.getWindowSize();
1260                 if (windowSize <= 0)
1261                     return null;
1262 
1263                 size = dataInfo.available();
1264                 if (size > windowSize)
1265                     size = windowSize;
1266 
1267                 buffer = generator.data(stream.getId(),size,dataInfo);
1268                 return buffer;
1269             }
1270             catch (Throwable x)
1271             {
1272                 fail(x);
1273                 return null;
1274             }
1275         }
1276 
1277         @Override
1278         public void complete()
1279         {
1280             bufferPool.release(buffer);
1281             IStream stream = getStream();
1282             flowControlStrategy.updateWindow(StandardSession.this, stream, -size);
1283             if (dataInfo.available() > 0)
1284             {
1285                 // We have written a frame out of this DataInfo, but there is more to write.
1286                 // We need to keep the correct ordering of frames, to avoid that another
1287                 // DataInfo for the same stream is written before this one is finished.
1288                 prepend(this);
1289                 flush();
1290             }
1291             else
1292             {
1293                 super.complete();
1294                 stream.updateCloseState(dataInfo.isClose(),true);
1295                 if (stream.isClosed())
1296                     removeStream(stream);
1297             }
1298         }
1299 
1300         @Override
1301         public String toString()
1302         {
1303             return String.format("DATA bytes @%x available=%d consumed=%d on %s", dataInfo.hashCode(), dataInfo.available(), dataInfo.consumed(), getStream());
1304         }
1305     }
1306 }