1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.spdy;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.nio.ByteBuffer;
24 import java.nio.channels.InterruptedByTimeoutException;
25 import java.util.ArrayList;
26 import java.util.Collections;
27 import java.util.HashSet;
28 import java.util.LinkedList;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Objects;
32 import java.util.Set;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentMap;
35 import java.util.concurrent.CopyOnWriteArrayList;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.Executor;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.TimeoutException;
40 import java.util.concurrent.atomic.AtomicBoolean;
41 import java.util.concurrent.atomic.AtomicInteger;
42
43 import org.eclipse.jetty.io.ByteBufferPool;
44 import org.eclipse.jetty.io.EndPoint;
45 import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
46 import org.eclipse.jetty.spdy.api.DataInfo;
47 import org.eclipse.jetty.spdy.api.GoAwayInfo;
48 import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
49 import org.eclipse.jetty.spdy.api.PingInfo;
50 import org.eclipse.jetty.spdy.api.PingResultInfo;
51 import org.eclipse.jetty.spdy.api.PushInfo;
52 import org.eclipse.jetty.spdy.api.RstInfo;
53 import org.eclipse.jetty.spdy.api.SPDYException;
54 import org.eclipse.jetty.spdy.api.Session;
55 import org.eclipse.jetty.spdy.api.SessionFrameListener;
56 import org.eclipse.jetty.spdy.api.SessionStatus;
57 import org.eclipse.jetty.spdy.api.Settings;
58 import org.eclipse.jetty.spdy.api.SettingsInfo;
59 import org.eclipse.jetty.spdy.api.Stream;
60 import org.eclipse.jetty.spdy.api.StreamFrameListener;
61 import org.eclipse.jetty.spdy.api.StreamStatus;
62 import org.eclipse.jetty.spdy.api.SynInfo;
63 import org.eclipse.jetty.spdy.frames.ControlFrame;
64 import org.eclipse.jetty.spdy.frames.ControlFrameType;
65 import org.eclipse.jetty.spdy.frames.CredentialFrame;
66 import org.eclipse.jetty.spdy.frames.DataFrame;
67 import org.eclipse.jetty.spdy.frames.GoAwayFrame;
68 import org.eclipse.jetty.spdy.frames.HeadersFrame;
69 import org.eclipse.jetty.spdy.frames.PingFrame;
70 import org.eclipse.jetty.spdy.frames.RstStreamFrame;
71 import org.eclipse.jetty.spdy.frames.SettingsFrame;
72 import org.eclipse.jetty.spdy.frames.SynReplyFrame;
73 import org.eclipse.jetty.spdy.frames.SynStreamFrame;
74 import org.eclipse.jetty.spdy.frames.WindowUpdateFrame;
75 import org.eclipse.jetty.spdy.generator.Generator;
76 import org.eclipse.jetty.spdy.parser.Parser;
77 import org.eclipse.jetty.util.Atomics;
78 import org.eclipse.jetty.util.BufferUtil;
79 import org.eclipse.jetty.util.Callback;
80 import org.eclipse.jetty.util.ForkInvoker;
81 import org.eclipse.jetty.util.FutureCallback;
82 import org.eclipse.jetty.util.FuturePromise;
83 import org.eclipse.jetty.util.Promise;
84 import org.eclipse.jetty.util.component.ContainerLifeCycle;
85 import org.eclipse.jetty.util.component.Dumpable;
86 import org.eclipse.jetty.util.log.Log;
87 import org.eclipse.jetty.util.log.Logger;
88 import org.eclipse.jetty.util.thread.Scheduler;
89
90 public class StandardSession implements ISession, Parser.Listener, Dumpable
91 {
92 private static final Logger LOG = Log.getLogger(Session.class);
93
94 private final ForkInvoker<Callback> invoker = new SessionInvoker();
95 private final Map<String, Object> attributes = new ConcurrentHashMap<>();
96 private final List<Listener> listeners = new CopyOnWriteArrayList<>();
97 private final ConcurrentMap<Integer, IStream> streams = new ConcurrentHashMap<>();
98 private final LinkedList<FrameBytes> queue = new LinkedList<>();
99 private final ByteBufferPool bufferPool;
100 private final Executor threadPool;
101 private final Scheduler scheduler;
102 private final short version;
103 private final Controller controller;
104 private final EndPoint endPoint;
105 private final IdleListener idleListener;
106 private final AtomicInteger streamIds;
107 private final AtomicInteger pingIds;
108 private final SessionFrameListener listener;
109 private final Generator generator;
110 private final AtomicBoolean goAwaySent = new AtomicBoolean();
111 private final AtomicBoolean goAwayReceived = new AtomicBoolean();
112 private final AtomicInteger lastStreamId = new AtomicInteger();
113 private final AtomicInteger localStreamCount = new AtomicInteger(0);
114 private final FlowControlStrategy flowControlStrategy;
115 private volatile int maxConcurrentLocalStreams = -1;
116 private boolean flushing;
117 private Throwable failure;
118
119 public StandardSession(short version, ByteBufferPool bufferPool, Executor threadPool, Scheduler scheduler,
120 Controller controller, EndPoint endPoint, IdleListener idleListener, int initialStreamId,
121 SessionFrameListener listener, Generator generator, FlowControlStrategy flowControlStrategy)
122 {
123
124 this.version = version;
125 this.bufferPool = bufferPool;
126 this.threadPool = threadPool;
127 this.scheduler = scheduler;
128 this.controller = controller;
129 this.endPoint = endPoint;
130 this.idleListener = idleListener;
131 this.streamIds = new AtomicInteger(initialStreamId);
132 this.pingIds = new AtomicInteger(initialStreamId);
133 this.listener = listener;
134 this.generator = generator;
135 this.flowControlStrategy = flowControlStrategy;
136 }
137
138 @Override
139 public short getVersion()
140 {
141 return version;
142 }
143
144 @Override
145 public void addListener(Listener listener)
146 {
147 listeners.add(listener);
148 }
149
150 @Override
151 public void removeListener(Listener listener)
152 {
153 listeners.remove(listener);
154 }
155
156 @Override
157 public Stream syn(SynInfo synInfo, StreamFrameListener listener) throws ExecutionException, InterruptedException, TimeoutException
158 {
159 FuturePromise<Stream> result = new FuturePromise<>();
160 syn(synInfo, listener, result);
161 if (synInfo.getTimeout() > 0)
162 return result.get(synInfo.getTimeout(), synInfo.getUnit());
163 else
164 return result.get();
165 }
166
167 @Override
168 public void syn(SynInfo synInfo, StreamFrameListener listener, Promise<Stream> promise)
169 {
170
171
172
173
174
175
176 int associatedStreamId = 0;
177 if (synInfo instanceof PushSynInfo)
178 associatedStreamId = ((PushSynInfo)synInfo).getAssociatedStreamId();
179
180 synchronized (this)
181 {
182 int streamId = streamIds.getAndAdd(2);
183
184 SynStreamFrame synStream = new SynStreamFrame(version, synInfo.getFlags(), streamId, associatedStreamId, synInfo.getPriority(), (short)0, synInfo.getHeaders());
185 IStream stream = createStream(synStream, listener, true, promise);
186 if (stream == null)
187 return;
188 generateAndEnqueueControlFrame(stream, synStream, synInfo.getTimeout(), synInfo.getUnit(), stream);
189 }
190 flush();
191 }
192
193 @Override
194 public void rst(RstInfo rstInfo) throws InterruptedException, ExecutionException, TimeoutException
195 {
196 FutureCallback result = new FutureCallback();
197 rst(rstInfo, result);
198 if (rstInfo.getTimeout() > 0)
199 result.get(rstInfo.getTimeout(), rstInfo.getUnit());
200 else
201 result.get();
202 }
203
204 @Override
205 public void rst(RstInfo rstInfo, Callback callback)
206 {
207
208 if (goAwaySent.get())
209 {
210 complete(callback);
211 }
212 else
213 {
214 int streamId = rstInfo.getStreamId();
215 IStream stream = streams.get(streamId);
216 RstStreamFrame frame = new RstStreamFrame(version, streamId, rstInfo.getStreamStatus().getCode(version));
217 control(stream, frame, rstInfo.getTimeout(), rstInfo.getUnit(), callback);
218 if (stream != null)
219 {
220 stream.process(frame);
221 removeFrameBytesFromQueue(stream);
222 removeStream(stream);
223 }
224 }
225 }
226
227 private void removeFrameBytesFromQueue(Stream stream)
228 {
229 synchronized (queue)
230 {
231 for (FrameBytes frameBytes : queue)
232 if (frameBytes.getStream() == stream)
233 queue.remove(frameBytes);
234 }
235 }
236
237 @Override
238 public void settings(SettingsInfo settingsInfo) throws ExecutionException, InterruptedException, TimeoutException
239 {
240 FutureCallback result = new FutureCallback();
241 settings(settingsInfo, result);
242 if (settingsInfo.getTimeout() > 0)
243 result.get(settingsInfo.getTimeout(), settingsInfo.getUnit());
244 else
245 result.get();
246 }
247
248 @Override
249 public void settings(SettingsInfo settingsInfo, Callback callback)
250 {
251 SettingsFrame frame = new SettingsFrame(version, settingsInfo.getFlags(), settingsInfo.getSettings());
252 control(null, frame, settingsInfo.getTimeout(), settingsInfo.getUnit(), callback);
253 }
254
255 @Override
256 public PingResultInfo ping(PingInfo pingInfo) throws ExecutionException, InterruptedException, TimeoutException
257 {
258 FuturePromise<PingResultInfo> result = new FuturePromise<>();
259 ping(pingInfo, result);
260 if (pingInfo.getTimeout() > 0)
261 return result.get(pingInfo.getTimeout(), pingInfo.getUnit());
262 else
263 return result.get();
264 }
265
266 @Override
267 public void ping(PingInfo pingInfo, Promise<PingResultInfo> promise)
268 {
269 int pingId = pingIds.getAndAdd(2);
270 PingInfoCallback pingInfoCallback = new PingInfoCallback(pingId, promise);
271 PingFrame frame = new PingFrame(version, pingId);
272 control(null, frame, pingInfo.getTimeout(), pingInfo.getUnit(), pingInfoCallback);
273 }
274
275 @Override
276 public void goAway(GoAwayInfo goAwayInfo) throws ExecutionException, InterruptedException, TimeoutException
277 {
278 goAway(goAwayInfo, SessionStatus.OK);
279 }
280
281 private void goAway(GoAwayInfo goAwayInfo, SessionStatus sessionStatus) throws ExecutionException, InterruptedException, TimeoutException
282 {
283 FutureCallback result = new FutureCallback();
284 goAway(sessionStatus, goAwayInfo.getTimeout(), goAwayInfo.getUnit(), result);
285 if (goAwayInfo.getTimeout() > 0)
286 result.get(goAwayInfo.getTimeout(), goAwayInfo.getUnit());
287 else
288 result.get();
289 }
290
291 @Override
292 public void goAway(GoAwayInfo goAwayInfo, Callback callback)
293 {
294 goAway(SessionStatus.OK, goAwayInfo.getTimeout(), goAwayInfo.getUnit(), callback);
295 }
296
297 private void goAway(SessionStatus sessionStatus, long timeout, TimeUnit unit, Callback callback)
298 {
299 if (goAwaySent.compareAndSet(false, true))
300 {
301 if (!goAwayReceived.get())
302 {
303 GoAwayFrame frame = new GoAwayFrame(version, lastStreamId.get(), sessionStatus.getCode());
304 control(null, frame, timeout, unit, callback);
305 return;
306 }
307 }
308 complete(callback);
309 }
310
311 @Override
312 public Set<Stream> getStreams()
313 {
314 Set<Stream> result = new HashSet<>();
315 result.addAll(streams.values());
316 return result;
317 }
318
319 @Override
320 public IStream getStream(int streamId)
321 {
322 return streams.get(streamId);
323 }
324
325 @Override
326 public Object getAttribute(String key)
327 {
328 return attributes.get(key);
329 }
330
331 @Override
332 public void setAttribute(String key, Object value)
333 {
334 attributes.put(key, value);
335 }
336
337 @Override
338 public Object removeAttribute(String key)
339 {
340 return attributes.remove(key);
341 }
342
343 @Override
344 public InetSocketAddress getLocalAddress()
345 {
346 return endPoint.getLocalAddress();
347 }
348
349 @Override
350 public InetSocketAddress getRemoteAddress()
351 {
352 return endPoint.getRemoteAddress();
353 }
354
355 @Override
356 public void onControlFrame(ControlFrame frame)
357 {
358 notifyIdle(idleListener, false);
359 try
360 {
361 LOG.debug("Processing {}", frame);
362
363 if (goAwaySent.get())
364 {
365 LOG.debug("Skipped processing of {}", frame);
366 return;
367 }
368
369 switch (frame.getType())
370 {
371 case SYN_STREAM:
372 {
373 onSyn((SynStreamFrame)frame);
374 break;
375 }
376 case SYN_REPLY:
377 {
378 onReply((SynReplyFrame)frame);
379 break;
380 }
381 case RST_STREAM:
382 {
383 onRst((RstStreamFrame)frame);
384 break;
385 }
386 case SETTINGS:
387 {
388 onSettings((SettingsFrame)frame);
389 break;
390 }
391 case NOOP:
392 {
393
394 break;
395 }
396 case PING:
397 {
398 onPing((PingFrame)frame);
399 break;
400 }
401 case GO_AWAY:
402 {
403 onGoAway((GoAwayFrame)frame);
404 break;
405 }
406 case HEADERS:
407 {
408 onHeaders((HeadersFrame)frame);
409 break;
410 }
411 case WINDOW_UPDATE:
412 {
413 onWindowUpdate((WindowUpdateFrame)frame);
414 break;
415 }
416 case CREDENTIAL:
417 {
418 onCredential((CredentialFrame)frame);
419 break;
420 }
421 default:
422 {
423 throw new IllegalStateException();
424 }
425 }
426 }
427 finally
428 {
429 notifyIdle(idleListener, true);
430 }
431 }
432
433 @Override
434 public void onDataFrame(DataFrame frame, ByteBuffer data)
435 {
436 notifyIdle(idleListener, false);
437 try
438 {
439 LOG.debug("Processing {}, {} data bytes", frame, data.remaining());
440
441 if (goAwaySent.get())
442 {
443 LOG.debug("Skipped processing of {}", frame);
444 return;
445 }
446
447 int streamId = frame.getStreamId();
448 IStream stream = streams.get(streamId);
449 if (stream == null)
450 {
451 RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM);
452 LOG.debug("Unknown stream {}", rstInfo);
453 rst(rstInfo, new Callback.Adapter());
454 }
455 else
456 {
457 processData(stream, frame, data);
458 }
459 }
460 finally
461 {
462 notifyIdle(idleListener, true);
463 }
464 }
465
466 private void notifyIdle(IdleListener listener, boolean idle)
467 {
468 if (listener != null)
469 listener.onIdle(idle);
470 }
471
472 private void processData(final IStream stream, DataFrame frame, ByteBuffer data)
473 {
474 ByteBufferDataInfo dataInfo = new ByteBufferDataInfo(data, frame.isClose())
475 {
476 @Override
477 public void consume(int delta)
478 {
479 super.consume(delta);
480 flowControlStrategy.onDataConsumed(StandardSession.this, stream, this, delta);
481 }
482 };
483 flowControlStrategy.onDataReceived(this, stream, dataInfo);
484 stream.process(dataInfo);
485 if (stream.isClosed())
486 removeStream(stream);
487 }
488
489 @Override
490 public void onStreamException(StreamException x)
491 {
492 notifyOnException(listener, x);
493 rst(new RstInfo(x.getStreamId(), x.getStreamStatus()), new Callback.Adapter());
494 }
495
496 @Override
497 public void onSessionException(SessionException x)
498 {
499 Throwable cause = x.getCause();
500 notifyOnException(listener, cause == null ? x : cause);
501 goAway(x.getSessionStatus(), 0, TimeUnit.SECONDS, new Callback.Adapter());
502 }
503
504 private void onSyn(final SynStreamFrame frame)
505 {
506 IStream stream = createStream(frame, null, false, new Promise.Adapter<Stream>()
507 {
508 @Override
509 public void failed(Throwable x)
510 {
511 LOG.debug("Received: {} but creating new Stream failed: {}", frame, x.getMessage());
512 }
513 });
514 if (stream != null)
515 processSyn(listener, stream, frame);
516 }
517
518 private void processSyn(SessionFrameListener listener, IStream stream, SynStreamFrame frame)
519 {
520 stream.process(frame);
521
522 updateLastStreamId(stream);
523 StreamFrameListener streamListener;
524 if (stream.isUnidirectional())
525 {
526 PushInfo pushInfo = new PushInfo(frame.getHeaders(), frame.isClose());
527 streamListener = notifyOnPush(stream.getAssociatedStream().getStreamFrameListener(), stream, pushInfo);
528 }
529 else
530 {
531 SynInfo synInfo = new SynInfo(frame.getHeaders(), frame.isClose(), frame.getPriority());
532 streamListener = notifyOnSyn(listener, stream, synInfo);
533 }
534 stream.setStreamFrameListener(streamListener);
535 flush();
536
537 if (stream.isClosed())
538 removeStream(stream);
539 }
540
541 private IStream createStream(SynStreamFrame frame, StreamFrameListener listener, boolean local, Promise<Stream> promise)
542 {
543 IStream associatedStream = streams.get(frame.getAssociatedStreamId());
544 IStream stream = new StandardStream(frame.getStreamId(), frame.getPriority(), this, associatedStream, promise);
545 flowControlStrategy.onNewStream(this, stream);
546
547 stream.updateCloseState(frame.isClose(), local);
548 stream.setStreamFrameListener(listener);
549
550 if (stream.isUnidirectional())
551 {
552
553 stream.updateCloseState(true, !local);
554 if (!stream.isClosed())
555 stream.getAssociatedStream().associate(stream);
556 }
557
558 int streamId = stream.getId();
559
560 if (local)
561 {
562 while (true)
563 {
564 int oldStreamCountValue = localStreamCount.get();
565 int maxConcurrentStreams = maxConcurrentLocalStreams;
566 if (maxConcurrentStreams > -1 && oldStreamCountValue >= maxConcurrentStreams)
567 {
568 String message = String.format("Max concurrent local streams (%d) exceeded.",
569 maxConcurrentStreams);
570 LOG.debug(message);
571 promise.failed(new SPDYException(message));
572 return null;
573 }
574 if (localStreamCount.compareAndSet(oldStreamCountValue, oldStreamCountValue + 1))
575 break;
576 }
577 }
578
579 if (streams.putIfAbsent(streamId, stream) != null)
580 {
581 String message = "Duplicate stream id " + streamId;
582 IllegalStateException duplicateIdException = new IllegalStateException(message);
583 promise.failed(duplicateIdException);
584 if (local)
585 {
586 localStreamCount.decrementAndGet();
587 throw duplicateIdException;
588 }
589 RstInfo rstInfo = new RstInfo(streamId, StreamStatus.PROTOCOL_ERROR);
590 LOG.debug("Duplicate stream, {}", rstInfo);
591 rst(rstInfo, new Callback.Adapter());
592 return null;
593 }
594 else
595 {
596 LOG.debug("Created {}", stream);
597 notifyStreamCreated(stream);
598 return stream;
599 }
600 }
601
602 private void notifyStreamCreated(IStream stream)
603 {
604 for (Listener listener : listeners)
605 {
606 if (listener instanceof StreamListener)
607 {
608 try
609 {
610 ((StreamListener)listener).onStreamCreated(stream);
611 }
612 catch (Exception x)
613 {
614 LOG.info("Exception while notifying listener " + listener, x);
615 }
616 catch (Error x)
617 {
618 LOG.info("Exception while notifying listener " + listener, x);
619 throw x;
620 }
621 }
622 }
623 }
624
625 private void removeStream(IStream stream)
626 {
627 if (stream.isUnidirectional())
628 stream.getAssociatedStream().disassociate(stream);
629
630 IStream removed = streams.remove(stream.getId());
631 if (removed != null)
632 {
633 assert removed == stream;
634
635 if (streamIds.get() % 2 == stream.getId() % 2)
636 localStreamCount.decrementAndGet();
637
638 LOG.debug("Removed {}", stream);
639 notifyStreamClosed(stream);
640 }
641 }
642
643 private void notifyStreamClosed(IStream stream)
644 {
645 for (Listener listener : listeners)
646 {
647 if (listener instanceof StreamListener)
648 {
649 try
650 {
651 ((StreamListener)listener).onStreamClosed(stream);
652 }
653 catch (Exception x)
654 {
655 LOG.info("Exception while notifying listener " + listener, x);
656 }
657 catch (Error x)
658 {
659 LOG.info("Exception while notifying listener " + listener, x);
660 throw x;
661 }
662 }
663 }
664 }
665
666 private void onReply(SynReplyFrame frame)
667 {
668 int streamId = frame.getStreamId();
669 IStream stream = streams.get(streamId);
670 if (stream == null)
671 {
672 RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM);
673 LOG.debug("Unknown stream {}", rstInfo);
674 rst(rstInfo, new Callback.Adapter());
675 }
676 else
677 {
678 processReply(stream, frame);
679 }
680 }
681
682 private void processReply(IStream stream, SynReplyFrame frame)
683 {
684 stream.process(frame);
685 if (stream.isClosed())
686 removeStream(stream);
687 }
688
689 private void onRst(RstStreamFrame frame)
690 {
691 IStream stream = streams.get(frame.getStreamId());
692
693 if (stream != null)
694 stream.process(frame);
695
696 RstInfo rstInfo = new RstInfo(frame.getStreamId(), StreamStatus.from(frame.getVersion(), frame.getStatusCode()));
697 notifyOnRst(listener, rstInfo);
698 flush();
699
700 if (stream != null)
701 removeStream(stream);
702 }
703
704 private void onSettings(SettingsFrame frame)
705 {
706 Settings.Setting windowSizeSetting = frame.getSettings().get(Settings.ID.INITIAL_WINDOW_SIZE);
707 if (windowSizeSetting != null)
708 {
709 int windowSize = windowSizeSetting.value();
710 setWindowSize(windowSize);
711 LOG.debug("Updated session window size to {}", windowSize);
712 }
713 Settings.Setting maxConcurrentStreamsSetting = frame.getSettings().get(Settings.ID.MAX_CONCURRENT_STREAMS);
714 if (maxConcurrentStreamsSetting != null)
715 {
716 int maxConcurrentStreamsValue = maxConcurrentStreamsSetting.value();
717 maxConcurrentLocalStreams = maxConcurrentStreamsValue;
718 LOG.debug("Updated session maxConcurrentLocalStreams to {}", maxConcurrentStreamsValue);
719 }
720 SettingsInfo settingsInfo = new SettingsInfo(frame.getSettings(), frame.isClearPersisted());
721 notifyOnSettings(listener, settingsInfo);
722 flush();
723 }
724
725 private void onPing(PingFrame frame)
726 {
727 int pingId = frame.getPingId();
728 if (pingId % 2 == pingIds.get() % 2)
729 {
730 PingResultInfo pingResultInfo = new PingResultInfo(frame.getPingId());
731 notifyOnPing(listener, pingResultInfo);
732 flush();
733 }
734 else
735 {
736 control(null, frame, 0, TimeUnit.MILLISECONDS, new Callback.Adapter());
737 }
738 }
739
740 private void onGoAway(GoAwayFrame frame)
741 {
742 if (goAwayReceived.compareAndSet(false, true))
743 {
744
745 GoAwayResultInfo goAwayResultInfo = new GoAwayResultInfo(frame.getLastStreamId(), SessionStatus.from(frame.getStatusCode()));
746 notifyOnGoAway(listener, goAwayResultInfo);
747 flush();
748
749
750
751 }
752 }
753
754 private void onHeaders(HeadersFrame frame)
755 {
756 int streamId = frame.getStreamId();
757 IStream stream = streams.get(streamId);
758 if (stream == null)
759 {
760 RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM);
761 LOG.debug("Unknown stream, {}", rstInfo);
762 rst(rstInfo, new Callback.Adapter());
763 }
764 else
765 {
766 processHeaders(stream, frame);
767 }
768 }
769
770 private void processHeaders(IStream stream, HeadersFrame frame)
771 {
772 stream.process(frame);
773 if (stream.isClosed())
774 removeStream(stream);
775 }
776
777 private void onWindowUpdate(WindowUpdateFrame frame)
778 {
779 int streamId = frame.getStreamId();
780 IStream stream = streams.get(streamId);
781 flowControlStrategy.onWindowUpdate(this, stream, frame.getWindowDelta());
782 flush();
783 }
784
785 private void onCredential(CredentialFrame frame)
786 {
787 LOG.warn("{} frame not yet supported", frame.getType());
788 flush();
789 }
790
791 protected void close()
792 {
793
794 if (controller != null)
795 controller.close(false);
796 }
797
798 private void notifyOnException(SessionFrameListener listener, Throwable x)
799 {
800 try
801 {
802 if (listener != null)
803 {
804 LOG.debug("Invoking callback with {} on listener {}", x, listener);
805 listener.onException(x);
806 }
807 }
808 catch (Exception xx)
809 {
810 LOG.info("Exception while notifying listener " + listener, xx);
811 }
812 catch (Error xx)
813 {
814 LOG.info("Exception while notifying listener " + listener, xx);
815 throw xx;
816 }
817 }
818
819 private StreamFrameListener notifyOnPush(StreamFrameListener listener, Stream stream, PushInfo pushInfo)
820 {
821 try
822 {
823 if (listener == null)
824 return null;
825 LOG.debug("Invoking callback with {} on listener {}", pushInfo, listener);
826 return listener.onPush(stream, pushInfo);
827 }
828 catch (Exception x)
829 {
830 LOG.info("Exception while notifying listener " + listener, x);
831 return null;
832 }
833 catch (Error x)
834 {
835 LOG.info("Exception while notifying listener " + listener, x);
836 throw x;
837 }
838 }
839
840 private StreamFrameListener notifyOnSyn(SessionFrameListener listener, Stream stream, SynInfo synInfo)
841 {
842 try
843 {
844 if (listener == null)
845 return null;
846 LOG.debug("Invoking callback with {} on listener {}", synInfo, listener);
847 return listener.onSyn(stream, synInfo);
848 }
849 catch (Exception x)
850 {
851 LOG.info("Exception while notifying listener " + listener, x);
852 return null;
853 }
854 catch (Error x)
855 {
856 LOG.info("Exception while notifying listener " + listener, x);
857 throw x;
858 }
859 }
860
861 private void notifyOnRst(SessionFrameListener listener, RstInfo rstInfo)
862 {
863 try
864 {
865 if (listener != null)
866 {
867 LOG.debug("Invoking callback with {} on listener {}", rstInfo, listener);
868 listener.onRst(this, rstInfo);
869 }
870 }
871 catch (Exception x)
872 {
873 LOG.info("Exception while notifying listener " + listener, x);
874 }
875 catch (Error x)
876 {
877 LOG.info("Exception while notifying listener " + listener, x);
878 throw x;
879 }
880 }
881
882 private void notifyOnSettings(SessionFrameListener listener, SettingsInfo settingsInfo)
883 {
884 try
885 {
886 if (listener != null)
887 {
888 LOG.debug("Invoking callback with {} on listener {}", settingsInfo, listener);
889 listener.onSettings(this, settingsInfo);
890 }
891 }
892 catch (Exception x)
893 {
894 LOG.info("Exception while notifying listener " + listener, x);
895 }
896 catch (Error x)
897 {
898 LOG.info("Exception while notifying listener " + listener, x);
899 throw x;
900 }
901 }
902
903 private void notifyOnPing(SessionFrameListener listener, PingResultInfo pingResultInfo)
904 {
905 try
906 {
907 if (listener != null)
908 {
909 LOG.debug("Invoking callback with {} on listener {}", pingResultInfo, listener);
910 listener.onPing(this, pingResultInfo);
911 }
912 }
913 catch (Exception x)
914 {
915 LOG.info("Exception while notifying listener " + listener, x);
916 }
917 catch (Error x)
918 {
919 LOG.info("Exception while notifying listener " + listener, x);
920 throw x;
921 }
922 }
923
924 private void notifyOnGoAway(SessionFrameListener listener, GoAwayResultInfo goAwayResultInfo)
925 {
926 try
927 {
928 if (listener != null)
929 {
930 LOG.debug("Invoking callback with {} on listener {}", goAwayResultInfo, listener);
931 listener.onGoAway(this, goAwayResultInfo);
932 }
933 }
934 catch (Exception x)
935 {
936 LOG.info("Exception while notifying listener " + listener, x);
937 }
938 catch (Error x)
939 {
940 LOG.info("Exception while notifying listener " + listener, x);
941 throw x;
942 }
943 }
944
945
946 @Override
947 public void control(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Callback callback)
948 {
949 generateAndEnqueueControlFrame(stream, frame, timeout, unit, callback);
950 flush();
951 }
952
953 private void generateAndEnqueueControlFrame(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Callback callback)
954 {
955 try
956 {
957
958
959
960 synchronized (this)
961 {
962 ByteBuffer buffer = generator.control(frame);
963 LOG.debug("Queuing {} on {}", frame, stream);
964 ControlFrameBytes frameBytes = new ControlFrameBytes(stream, callback, frame, buffer);
965 if (timeout > 0)
966 frameBytes.task = scheduler.schedule(frameBytes, timeout, unit);
967
968
969 if (ControlFrameType.PING == frame.getType())
970 prepend(frameBytes);
971 else
972 append(frameBytes);
973 }
974 }
975 catch (Exception x)
976 {
977 notifyCallbackFailed(callback, x);
978 }
979 }
980
981 private void updateLastStreamId(IStream stream)
982 {
983 int streamId = stream.getId();
984 if (streamId % 2 != streamIds.get() % 2)
985 Atomics.updateMax(lastStreamId, streamId);
986 }
987
988 @Override
989 public void data(IStream stream, DataInfo dataInfo, long timeout, TimeUnit unit, Callback callback)
990 {
991 LOG.debug("Queuing {} on {}", dataInfo, stream);
992 DataFrameBytes frameBytes = new DataFrameBytes(stream, callback, dataInfo);
993 if (timeout > 0)
994 frameBytes.task = scheduler.schedule(frameBytes, timeout, unit);
995 append(frameBytes);
996 flush();
997 }
998
999 @Override
1000 public void shutdown()
1001 {
1002 FrameBytes frameBytes = new CloseFrameBytes();
1003 append(frameBytes);
1004 flush();
1005 }
1006
1007 private void execute(Runnable task)
1008 {
1009 threadPool.execute(task);
1010 }
1011
1012 @Override
1013 public void flush()
1014 {
1015 FrameBytes frameBytes = null;
1016 ByteBuffer buffer = null;
1017 boolean failFrameBytes = false;
1018 synchronized (queue)
1019 {
1020 if (flushing || queue.isEmpty())
1021 return;
1022
1023 Set<IStream> stalledStreams = null;
1024 for (int i = 0; i < queue.size(); ++i)
1025 {
1026 frameBytes = queue.get(i);
1027
1028 IStream stream = frameBytes.getStream();
1029 if (stream != null && stalledStreams != null && stalledStreams.contains(stream))
1030 continue;
1031
1032 buffer = frameBytes.getByteBuffer();
1033 if (buffer != null)
1034 {
1035 queue.remove(i);
1036 if (stream != null && stream.isReset() && !(frameBytes instanceof ControlFrameBytes))
1037 failFrameBytes = true;
1038 break;
1039 }
1040
1041 if (stalledStreams == null)
1042 stalledStreams = new HashSet<>();
1043 if (stream != null)
1044 stalledStreams.add(stream);
1045
1046 LOG.debug("Flush stalled for {}, {} frame(s) in queue", frameBytes, queue.size());
1047 }
1048
1049 if (buffer == null)
1050 return;
1051
1052 if (!failFrameBytes)
1053 {
1054 flushing = true;
1055 LOG.debug("Flushing {}, {} frame(s) in queue", frameBytes, queue.size());
1056 }
1057 }
1058 if (failFrameBytes)
1059 {
1060 frameBytes.fail(new StreamException(frameBytes.getStream().getId(), StreamStatus.INVALID_STREAM,
1061 "Stream: " + frameBytes.getStream() + " is reset!"));
1062 }
1063 else
1064 {
1065 write(buffer, frameBytes);
1066 }
1067 }
1068
1069 void append(FrameBytes frameBytes)
1070 {
1071 Throwable failure;
1072 synchronized (queue)
1073 {
1074 failure = this.failure;
1075 if (failure == null)
1076 {
1077
1078
1079 if (frameBytes instanceof ControlFrameBytes)
1080 queue.addLast(frameBytes);
1081 else
1082 {
1083 int index = queue.size();
1084 while (index > 0)
1085 {
1086 FrameBytes element = queue.get(index - 1);
1087 if (element.compareTo(frameBytes) >= 0)
1088 break;
1089 --index;
1090 }
1091 queue.add(index, frameBytes);
1092 }
1093 }
1094 }
1095
1096 if (failure != null)
1097 frameBytes.fail(new SPDYException(failure));
1098 }
1099
1100 private void prepend(FrameBytes frameBytes)
1101 {
1102 Throwable failure;
1103 synchronized (queue)
1104 {
1105 failure = this.failure;
1106 if (failure == null)
1107 {
1108 int index = 0;
1109 while (index < queue.size())
1110 {
1111 FrameBytes element = queue.get(index);
1112 if (element.compareTo(frameBytes) <= 0)
1113 break;
1114 ++index;
1115 }
1116 queue.add(index, frameBytes);
1117 }
1118 }
1119
1120 if (failure != null)
1121 frameBytes.fail(new SPDYException(failure));
1122 }
1123
1124 protected void write(ByteBuffer buffer, Callback callback)
1125 {
1126 if (controller != null)
1127 {
1128 LOG.debug("Writing {} frame bytes of {}", buffer.remaining(), buffer.limit());
1129 controller.write(buffer, callback);
1130 }
1131 }
1132
1133 private void complete(final Callback callback)
1134 {
1135
1136
1137
1138
1139 invoker.invoke(callback);
1140 }
1141
1142 private void notifyCallbackFailed(Callback callback, Throwable x)
1143 {
1144 try
1145 {
1146 if (callback != null)
1147 callback.failed(x);
1148 }
1149 catch (Exception xx)
1150 {
1151 LOG.info("Exception while notifying callback " + callback, xx);
1152 }
1153 catch (Error xx)
1154 {
1155 LOG.info("Exception while notifying callback " + callback, xx);
1156 throw xx;
1157 }
1158 }
1159
1160 public int getWindowSize()
1161 {
1162 return flowControlStrategy.getWindowSize(this);
1163 }
1164
1165 public void setWindowSize(int initialWindowSize)
1166 {
1167 flowControlStrategy.setWindowSize(this, initialWindowSize);
1168 }
1169
1170 @Override
1171 public String toString()
1172 {
1173 return String.format("%s@%x{v%d,queueSize=%d,windowSize=%d,streams=%d}", getClass().getSimpleName(),
1174 hashCode(), version, queue.size(), getWindowSize(), streams.size());
1175 }
1176
1177 @Override
1178 public String dump()
1179 {
1180 return ContainerLifeCycle.dump(this);
1181 }
1182
1183 @Override
1184 public void dump(Appendable out, String indent) throws IOException
1185 {
1186 ContainerLifeCycle.dumpObject(out, this);
1187 ContainerLifeCycle.dump(out, indent, Collections.singletonList(controller), streams.values());
1188 }
1189
1190 private class SessionInvoker extends ForkInvoker<Callback>
1191 {
1192 private SessionInvoker()
1193 {
1194 super(4);
1195 }
1196
1197 @Override
1198 public void fork(final Callback callback)
1199 {
1200 execute(new Runnable()
1201 {
1202 @Override
1203 public void run()
1204 {
1205 callback.succeeded();
1206 flush();
1207 }
1208 });
1209 }
1210
1211 @Override
1212 public void call(Callback callback)
1213 {
1214 callback.succeeded();
1215 flush();
1216 }
1217 }
1218
1219 public interface FrameBytes extends Comparable<FrameBytes>, Callback
1220 {
1221 public IStream getStream();
1222
1223 public abstract ByteBuffer getByteBuffer();
1224
1225 public abstract void complete();
1226
1227 public abstract void fail(Throwable throwable);
1228 }
1229
1230 abstract class AbstractFrameBytes implements FrameBytes, Runnable
1231 {
1232 private final IStream stream;
1233 private final Callback callback;
1234 protected volatile Scheduler.Task task;
1235
1236 protected AbstractFrameBytes(IStream stream, Callback callback)
1237 {
1238 this.stream = stream;
1239 this.callback = Objects.requireNonNull(callback);
1240 }
1241
1242 @Override
1243 public IStream getStream()
1244 {
1245 return stream;
1246 }
1247
1248 @Override
1249 public int compareTo(FrameBytes that)
1250 {
1251
1252
1253 IStream thisStream = getStream();
1254 IStream thatStream = that.getStream();
1255 if (thisStream == null)
1256 return thatStream == null ? 0 : -1;
1257 if (thatStream == null)
1258 return 1;
1259
1260 return thatStream.getPriority() - thisStream.getPriority();
1261 }
1262
1263 @Override
1264 public void complete()
1265 {
1266 cancelTask();
1267 StandardSession.this.complete(callback);
1268 }
1269
1270 @Override
1271 public void fail(Throwable x)
1272 {
1273 cancelTask();
1274 notifyCallbackFailed(callback, x);
1275 StandardSession.this.flush();
1276 }
1277
1278 private void cancelTask()
1279 {
1280 Scheduler.Task task = this.task;
1281 if (task != null)
1282 task.cancel();
1283 }
1284
1285 @Override
1286 public void run()
1287 {
1288 close();
1289 fail(new InterruptedByTimeoutException());
1290 }
1291
1292 @Override
1293 public void succeeded()
1294 {
1295 synchronized (queue)
1296 {
1297 if (LOG.isDebugEnabled())
1298 LOG.debug("Completed write of {}, {} frame(s) in queue", this, queue.size());
1299 flushing = false;
1300 }
1301 complete();
1302 }
1303
1304 @Override
1305 public void failed(Throwable x)
1306 {
1307 List<FrameBytes> frameBytesToFail = new ArrayList<>();
1308 frameBytesToFail.add(this);
1309
1310 synchronized (queue)
1311 {
1312 failure = x;
1313 if (LOG.isDebugEnabled())
1314 {
1315 String logMessage = String.format("Failed write of %s, failing all %d frame(s) in queue", this, queue.size());
1316 LOG.debug(logMessage, x);
1317 }
1318 frameBytesToFail.addAll(queue);
1319 queue.clear();
1320 flushing = false;
1321 }
1322
1323 for (FrameBytes fb : frameBytesToFail)
1324 fb.fail(x);
1325 }
1326 }
1327
1328 private class ControlFrameBytes extends AbstractFrameBytes
1329 {
1330 private final ControlFrame frame;
1331 private final ByteBuffer buffer;
1332
1333 private ControlFrameBytes(IStream stream, Callback callback, ControlFrame frame, ByteBuffer buffer)
1334 {
1335 super(stream, callback);
1336 this.frame = frame;
1337 this.buffer = buffer;
1338 }
1339
1340 @Override
1341 public ByteBuffer getByteBuffer()
1342 {
1343 return buffer;
1344 }
1345
1346 @Override
1347 public void complete()
1348 {
1349 bufferPool.release(buffer);
1350
1351 super.complete();
1352
1353 if (frame.getType() == ControlFrameType.GO_AWAY)
1354 {
1355
1356
1357 close();
1358 }
1359 IStream stream = getStream();
1360 if (stream != null && stream.isClosed())
1361 removeStream(stream);
1362 }
1363
1364 @Override
1365 public String toString()
1366 {
1367 return frame.toString();
1368 }
1369 }
1370
1371 private class DataFrameBytes extends AbstractFrameBytes
1372 {
1373 private final DataInfo dataInfo;
1374 private int size;
1375 private volatile ByteBuffer buffer;
1376
1377 private DataFrameBytes(IStream stream, Callback handler, DataInfo dataInfo)
1378 {
1379 super(stream, handler);
1380 this.dataInfo = dataInfo;
1381 }
1382
1383 @Override
1384 public ByteBuffer getByteBuffer()
1385 {
1386 try
1387 {
1388 IStream stream = getStream();
1389 int windowSize = stream.getWindowSize();
1390 if (windowSize <= 0)
1391 return null;
1392
1393 size = dataInfo.available();
1394 if (size > windowSize)
1395 size = windowSize;
1396
1397 buffer = generator.data(stream.getId(), size, dataInfo);
1398 return buffer;
1399 }
1400 catch (Throwable x)
1401 {
1402 fail(x);
1403 return null;
1404 }
1405 }
1406
1407 @Override
1408 public void complete()
1409 {
1410 bufferPool.release(buffer);
1411 IStream stream = getStream();
1412 dataInfo.consume(size);
1413 flowControlStrategy.updateWindow(StandardSession.this, stream, -size);
1414 if (dataInfo.available() > 0)
1415 {
1416
1417
1418
1419 prepend(this);
1420 flush();
1421 }
1422 else
1423 {
1424 super.complete();
1425 stream.updateCloseState(dataInfo.isClose(), true);
1426 if (stream.isClosed())
1427 removeStream(stream);
1428 }
1429 }
1430
1431 @Override
1432 public String toString()
1433 {
1434 return String.format("DATA bytes @%x available=%d consumed=%d on %s", dataInfo.hashCode(), dataInfo.available(), dataInfo.consumed(), getStream());
1435 }
1436 }
1437
1438 private class CloseFrameBytes extends AbstractFrameBytes
1439 {
1440 private CloseFrameBytes()
1441 {
1442 super(null, new Callback.Adapter());
1443 }
1444
1445 @Override
1446 public ByteBuffer getByteBuffer()
1447 {
1448 return BufferUtil.EMPTY_BUFFER;
1449 }
1450
1451 @Override
1452 public void complete()
1453 {
1454 super.complete();
1455 close();
1456 }
1457 }
1458
1459 private static class PingInfoCallback extends PingResultInfo implements Callback
1460 {
1461 private final Promise<PingResultInfo> promise;
1462
1463 public PingInfoCallback(int pingId, Promise<PingResultInfo> promise)
1464 {
1465 super(pingId);
1466 this.promise = promise;
1467 }
1468
1469 @Override
1470 public void succeeded()
1471 {
1472 if (promise != null)
1473 promise.succeeded(this);
1474 }
1475
1476 @Override
1477 public void failed(Throwable x)
1478 {
1479 if (promise != null)
1480 promise.failed(x);
1481 }
1482 }
1483 }