1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.spdy;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.nio.ByteBuffer;
24 import java.nio.channels.InterruptedByTimeoutException;
25 import java.util.Collections;
26 import java.util.HashSet;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Objects;
30 import java.util.Set;
31 import java.util.concurrent.ConcurrentHashMap;
32 import java.util.concurrent.ConcurrentMap;
33 import java.util.concurrent.CopyOnWriteArrayList;
34 import java.util.concurrent.ExecutionException;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.TimeoutException;
37 import java.util.concurrent.atomic.AtomicBoolean;
38 import java.util.concurrent.atomic.AtomicInteger;
39
40 import org.eclipse.jetty.io.ByteBufferPool;
41 import org.eclipse.jetty.io.EndPoint;
42 import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
43 import org.eclipse.jetty.spdy.api.DataInfo;
44 import org.eclipse.jetty.spdy.api.GoAwayInfo;
45 import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
46 import org.eclipse.jetty.spdy.api.PingInfo;
47 import org.eclipse.jetty.spdy.api.PingResultInfo;
48 import org.eclipse.jetty.spdy.api.PushInfo;
49 import org.eclipse.jetty.spdy.api.RstInfo;
50 import org.eclipse.jetty.spdy.api.SPDYException;
51 import org.eclipse.jetty.spdy.api.Session;
52 import org.eclipse.jetty.spdy.api.SessionFrameListener;
53 import org.eclipse.jetty.spdy.api.SessionStatus;
54 import org.eclipse.jetty.spdy.api.Settings;
55 import org.eclipse.jetty.spdy.api.SettingsInfo;
56 import org.eclipse.jetty.spdy.api.Stream;
57 import org.eclipse.jetty.spdy.api.StreamFrameListener;
58 import org.eclipse.jetty.spdy.api.StreamStatus;
59 import org.eclipse.jetty.spdy.api.SynInfo;
60 import org.eclipse.jetty.spdy.frames.ControlFrame;
61 import org.eclipse.jetty.spdy.frames.ControlFrameType;
62 import org.eclipse.jetty.spdy.frames.CredentialFrame;
63 import org.eclipse.jetty.spdy.frames.DataFrame;
64 import org.eclipse.jetty.spdy.frames.GoAwayFrame;
65 import org.eclipse.jetty.spdy.frames.HeadersFrame;
66 import org.eclipse.jetty.spdy.frames.PingFrame;
67 import org.eclipse.jetty.spdy.frames.RstStreamFrame;
68 import org.eclipse.jetty.spdy.frames.SettingsFrame;
69 import org.eclipse.jetty.spdy.frames.SynReplyFrame;
70 import org.eclipse.jetty.spdy.frames.SynStreamFrame;
71 import org.eclipse.jetty.spdy.frames.WindowUpdateFrame;
72 import org.eclipse.jetty.spdy.generator.Generator;
73 import org.eclipse.jetty.spdy.parser.Parser;
74 import org.eclipse.jetty.util.Atomics;
75 import org.eclipse.jetty.util.BufferUtil;
76 import org.eclipse.jetty.util.Callback;
77 import org.eclipse.jetty.util.FutureCallback;
78 import org.eclipse.jetty.util.FuturePromise;
79 import org.eclipse.jetty.util.Promise;
80 import org.eclipse.jetty.util.component.ContainerLifeCycle;
81 import org.eclipse.jetty.util.component.Dumpable;
82 import org.eclipse.jetty.util.log.Log;
83 import org.eclipse.jetty.util.log.Logger;
84 import org.eclipse.jetty.util.thread.Scheduler;
85
86 public class StandardSession implements ISession, Parser.Listener, Dumpable
87 {
88 private static final Logger LOG = Log.getLogger(Session.class);
89
90 private final Flusher flusher;
91 private final Map<String, Object> attributes = new ConcurrentHashMap<>();
92 private final List<Listener> listeners = new CopyOnWriteArrayList<>();
93 private final ConcurrentMap<Integer, IStream> streams = new ConcurrentHashMap<>();
94 private final ByteBufferPool bufferPool;
95 private final Scheduler scheduler;
96 private final short version;
97 private final Controller controller;
98 private final EndPoint endPoint;
99 private final IdleListener idleListener;
100 private final AtomicInteger streamIds;
101 private final AtomicInteger pingIds;
102 private final SessionFrameListener listener;
103 private final Generator generator;
104 private final AtomicBoolean goAwaySent = new AtomicBoolean();
105 private final AtomicBoolean goAwayReceived = new AtomicBoolean();
106 private final AtomicInteger lastStreamId = new AtomicInteger();
107 private final AtomicInteger localStreamCount = new AtomicInteger(0);
108 private final FlowControlStrategy flowControlStrategy;
109 private volatile int maxConcurrentLocalStreams = -1;
110
111 public StandardSession(short version, ByteBufferPool bufferPool, Scheduler scheduler,
112 Controller controller, EndPoint endPoint, IdleListener idleListener, int initialStreamId,
113 SessionFrameListener listener, Generator generator, FlowControlStrategy flowControlStrategy)
114 {
115
116 this.version = version;
117 this.bufferPool = bufferPool;
118 this.scheduler = scheduler;
119 this.controller = controller;
120 this.endPoint = endPoint;
121 this.idleListener = idleListener;
122 this.streamIds = new AtomicInteger(initialStreamId);
123 this.pingIds = new AtomicInteger(initialStreamId);
124 this.listener = listener;
125 this.generator = generator;
126 this.flowControlStrategy = flowControlStrategy;
127 this.flusher = new Flusher(controller);
128 }
129
130 @Override
131 public short getVersion()
132 {
133 return version;
134 }
135
136 @Override
137 public void addListener(Listener listener)
138 {
139 listeners.add(listener);
140 }
141
142 @Override
143 public void removeListener(Listener listener)
144 {
145 listeners.remove(listener);
146 }
147
148 @Override
149 public Stream syn(SynInfo synInfo, StreamFrameListener listener) throws ExecutionException, InterruptedException, TimeoutException
150 {
151 FuturePromise<Stream> result = new FuturePromise<>();
152 syn(synInfo, listener, result);
153 if (synInfo.getTimeout() > 0)
154 return result.get(synInfo.getTimeout(), synInfo.getUnit());
155 else
156 return result.get();
157 }
158
159 @Override
160 public void syn(SynInfo synInfo, StreamFrameListener listener, Promise<Stream> promise)
161 {
162
163
164
165
166
167
168 int associatedStreamId = 0;
169 if (synInfo instanceof PushSynInfo)
170 associatedStreamId = ((PushSynInfo)synInfo).getAssociatedStreamId();
171
172 synchronized (this)
173 {
174 int streamId = streamIds.getAndAdd(2);
175
176 SynStreamFrame synStream = new SynStreamFrame(version, synInfo.getFlags(), streamId, associatedStreamId, synInfo.getPriority(), (short)0, synInfo.getHeaders());
177 IStream stream = createStream(synStream, listener, true, promise);
178 if (stream == null)
179 return;
180 generateAndEnqueueControlFrame(stream, synStream, synInfo.getTimeout(), synInfo.getUnit(), stream);
181 }
182 }
183
184 @Override
185 public void rst(RstInfo rstInfo) throws InterruptedException, ExecutionException, TimeoutException
186 {
187 FutureCallback result = new FutureCallback();
188 rst(rstInfo, result);
189 if (rstInfo.getTimeout() > 0)
190 result.get(rstInfo.getTimeout(), rstInfo.getUnit());
191 else
192 result.get();
193 }
194
195 @Override
196 public void rst(RstInfo rstInfo, Callback callback)
197 {
198
199 if (goAwaySent.get())
200 {
201 complete(callback);
202 }
203 else
204 {
205 int streamId = rstInfo.getStreamId();
206 IStream stream = streams.get(streamId);
207 RstStreamFrame frame = new RstStreamFrame(version, streamId, rstInfo.getStreamStatus().getCode(version));
208 control(stream, frame, rstInfo.getTimeout(), rstInfo.getUnit(), callback);
209 if (stream != null)
210 {
211 stream.process(frame);
212 flusher.removeFrameBytesFromQueue(stream);
213 removeStream(stream);
214 }
215 }
216 }
217
218 @Override
219 public void settings(SettingsInfo settingsInfo) throws ExecutionException, InterruptedException, TimeoutException
220 {
221 FutureCallback result = new FutureCallback();
222 settings(settingsInfo, result);
223 if (settingsInfo.getTimeout() > 0)
224 result.get(settingsInfo.getTimeout(), settingsInfo.getUnit());
225 else
226 result.get();
227 }
228
229 @Override
230 public void settings(SettingsInfo settingsInfo, Callback callback)
231 {
232 SettingsFrame frame = new SettingsFrame(version, settingsInfo.getFlags(), settingsInfo.getSettings());
233 control(null, frame, settingsInfo.getTimeout(), settingsInfo.getUnit(), callback);
234 }
235
236 @Override
237 public PingResultInfo ping(PingInfo pingInfo) throws ExecutionException, InterruptedException, TimeoutException
238 {
239 FuturePromise<PingResultInfo> result = new FuturePromise<>();
240 ping(pingInfo, result);
241 if (pingInfo.getTimeout() > 0)
242 return result.get(pingInfo.getTimeout(), pingInfo.getUnit());
243 else
244 return result.get();
245 }
246
247 @Override
248 public void ping(PingInfo pingInfo, Promise<PingResultInfo> promise)
249 {
250 int pingId = pingIds.getAndAdd(2);
251 PingInfoCallback pingInfoCallback = new PingInfoCallback(pingId, promise);
252 PingFrame frame = new PingFrame(version, pingId);
253 control(null, frame, pingInfo.getTimeout(), pingInfo.getUnit(), pingInfoCallback);
254 }
255
256 @Override
257 public void goAway(GoAwayInfo goAwayInfo) throws ExecutionException, InterruptedException, TimeoutException
258 {
259 goAway(goAwayInfo, SessionStatus.OK);
260 }
261
262 private void goAway(GoAwayInfo goAwayInfo, SessionStatus sessionStatus) throws ExecutionException, InterruptedException, TimeoutException
263 {
264 FutureCallback result = new FutureCallback();
265 goAway(sessionStatus, goAwayInfo.getTimeout(), goAwayInfo.getUnit(), result);
266 if (goAwayInfo.getTimeout() > 0)
267 result.get(goAwayInfo.getTimeout(), goAwayInfo.getUnit());
268 else
269 result.get();
270 }
271
272 @Override
273 public void goAway(GoAwayInfo goAwayInfo, Callback callback)
274 {
275 goAway(SessionStatus.OK, goAwayInfo.getTimeout(), goAwayInfo.getUnit(), callback);
276 }
277
278 private void goAway(SessionStatus sessionStatus, long timeout, TimeUnit unit, Callback callback)
279 {
280 if (goAwaySent.compareAndSet(false, true))
281 {
282 if (!goAwayReceived.get())
283 {
284 GoAwayFrame frame = new GoAwayFrame(version, lastStreamId.get(), sessionStatus.getCode());
285 control(null, frame, timeout, unit, callback);
286 return;
287 }
288 }
289 complete(callback);
290 }
291
292 @Override
293 public Set<Stream> getStreams()
294 {
295 Set<Stream> result = new HashSet<>();
296 result.addAll(streams.values());
297 return result;
298 }
299
300 @Override
301 public IStream getStream(int streamId)
302 {
303 return streams.get(streamId);
304 }
305
306 @Override
307 public Object getAttribute(String key)
308 {
309 return attributes.get(key);
310 }
311
312 @Override
313 public void setAttribute(String key, Object value)
314 {
315 attributes.put(key, value);
316 }
317
318 @Override
319 public Object removeAttribute(String key)
320 {
321 return attributes.remove(key);
322 }
323
324 @Override
325 public InetSocketAddress getLocalAddress()
326 {
327 return endPoint.getLocalAddress();
328 }
329
330 @Override
331 public InetSocketAddress getRemoteAddress()
332 {
333 return endPoint.getRemoteAddress();
334 }
335
336 @Override
337 public void onControlFrame(ControlFrame frame)
338 {
339 notifyIdle(idleListener, false);
340 try
341 {
342 LOG.debug("Processing {}", frame);
343
344 if (goAwaySent.get())
345 {
346 LOG.debug("Skipped processing of {}", frame);
347 return;
348 }
349
350 switch (frame.getType())
351 {
352 case SYN_STREAM:
353 {
354 onSyn((SynStreamFrame)frame);
355 break;
356 }
357 case SYN_REPLY:
358 {
359 onReply((SynReplyFrame)frame);
360 break;
361 }
362 case RST_STREAM:
363 {
364 onRst((RstStreamFrame)frame);
365 break;
366 }
367 case SETTINGS:
368 {
369 onSettings((SettingsFrame)frame);
370 break;
371 }
372 case NOOP:
373 {
374
375 break;
376 }
377 case PING:
378 {
379 onPing((PingFrame)frame);
380 break;
381 }
382 case GO_AWAY:
383 {
384 onGoAway((GoAwayFrame)frame);
385 break;
386 }
387 case HEADERS:
388 {
389 onHeaders((HeadersFrame)frame);
390 break;
391 }
392 case WINDOW_UPDATE:
393 {
394 onWindowUpdate((WindowUpdateFrame)frame);
395 break;
396 }
397 case CREDENTIAL:
398 {
399 onCredential((CredentialFrame)frame);
400 break;
401 }
402 default:
403 {
404 throw new IllegalStateException();
405 }
406 }
407 }
408 finally
409 {
410 notifyIdle(idleListener, true);
411 }
412 }
413
414 @Override
415 public void onDataFrame(DataFrame frame, ByteBuffer data)
416 {
417 notifyIdle(idleListener, false);
418 try
419 {
420 LOG.debug("Processing {}, {} data bytes", frame, data.remaining());
421
422 if (goAwaySent.get())
423 {
424 LOG.debug("Skipped processing of {}", frame);
425 return;
426 }
427
428 int streamId = frame.getStreamId();
429 IStream stream = streams.get(streamId);
430 if (stream == null)
431 {
432 RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM);
433 LOG.debug("Unknown stream {}", rstInfo);
434 rst(rstInfo, new Callback.Adapter());
435 }
436 else
437 {
438 processData(stream, frame, data);
439 }
440 }
441 finally
442 {
443 notifyIdle(idleListener, true);
444 }
445 }
446
447 private void notifyIdle(IdleListener listener, boolean idle)
448 {
449 if (listener != null)
450 listener.onIdle(idle);
451 }
452
453 private void processData(final IStream stream, DataFrame frame, ByteBuffer data)
454 {
455 ByteBufferDataInfo dataInfo = new ByteBufferDataInfo(data, frame.isClose())
456 {
457 @Override
458 public void consume(int delta)
459 {
460 super.consume(delta);
461 flowControlStrategy.onDataConsumed(StandardSession.this, stream, this, delta);
462 }
463 };
464 flowControlStrategy.onDataReceived(this, stream, dataInfo);
465 stream.process(dataInfo);
466 if (stream.isClosed())
467 removeStream(stream);
468 }
469
470 @Override
471 public void onStreamException(StreamException x)
472 {
473
474 notifyOnException(listener, x);
475 rst(new RstInfo(x.getStreamId(), x.getStreamStatus()), new Callback.Adapter());
476 }
477
478 @Override
479 public void onSessionException(SessionException x)
480 {
481 Throwable cause = x.getCause();
482 notifyOnException(listener, cause == null ? x : cause);
483 goAway(x.getSessionStatus(), 0, TimeUnit.SECONDS, new Callback.Adapter());
484 }
485
486 private void onSyn(final SynStreamFrame frame)
487 {
488 IStream stream = createStream(frame, null, false, new Promise.Adapter<Stream>()
489 {
490 @Override
491 public void failed(Throwable x)
492 {
493 LOG.debug("Received: {} but creating new Stream failed: {}", frame, x.getMessage());
494 }
495 });
496 if (stream != null)
497 processSyn(listener, stream, frame);
498 }
499
500 private void processSyn(SessionFrameListener listener, IStream stream, SynStreamFrame frame)
501 {
502 stream.process(frame);
503
504 updateLastStreamId(stream);
505 StreamFrameListener streamListener;
506 if (stream.isUnidirectional())
507 {
508 PushInfo pushInfo = new PushInfo(frame.getHeaders(), frame.isClose());
509 streamListener = notifyOnPush(stream.getAssociatedStream().getStreamFrameListener(), stream, pushInfo);
510 }
511 else
512 {
513 SynInfo synInfo = new SynInfo(frame.getHeaders(), frame.isClose(), frame.getPriority());
514 streamListener = notifyOnSyn(listener, stream, synInfo);
515 }
516 stream.setStreamFrameListener(streamListener);
517
518 if (stream.isClosed())
519 removeStream(stream);
520 }
521
522 private IStream createStream(SynStreamFrame frame, StreamFrameListener listener, boolean local, Promise<Stream> promise)
523 {
524 IStream associatedStream = streams.get(frame.getAssociatedStreamId());
525 IStream stream = new StandardStream(frame.getStreamId(), frame.getPriority(), this, associatedStream,
526 scheduler, promise);
527 stream.setIdleTimeout(endPoint.getIdleTimeout());
528 flowControlStrategy.onNewStream(this, stream);
529
530 stream.updateCloseState(frame.isClose(), local);
531 stream.setStreamFrameListener(listener);
532
533 if (stream.isUnidirectional())
534 {
535
536 stream.updateCloseState(true, !local);
537 if (!stream.isClosed())
538 stream.getAssociatedStream().associate(stream);
539 }
540
541 int streamId = stream.getId();
542
543 if (local)
544 {
545 while (true)
546 {
547 int oldStreamCountValue = localStreamCount.get();
548 int maxConcurrentStreams = maxConcurrentLocalStreams;
549 if (maxConcurrentStreams > -1 && oldStreamCountValue >= maxConcurrentStreams)
550 {
551 String message = String.format("Max concurrent local streams (%d) exceeded.",
552 maxConcurrentStreams);
553 LOG.debug(message);
554 promise.failed(new SPDYException(message));
555 return null;
556 }
557 if (localStreamCount.compareAndSet(oldStreamCountValue, oldStreamCountValue + 1))
558 break;
559 }
560 }
561
562 if (streams.putIfAbsent(streamId, stream) != null)
563 {
564 String message = "Duplicate stream id " + streamId;
565 IllegalStateException duplicateIdException = new IllegalStateException(message);
566 promise.failed(duplicateIdException);
567 if (local)
568 {
569 localStreamCount.decrementAndGet();
570 throw duplicateIdException;
571 }
572 RstInfo rstInfo = new RstInfo(streamId, StreamStatus.PROTOCOL_ERROR);
573 LOG.debug("Duplicate stream, {}", rstInfo);
574 rst(rstInfo, new Callback.Adapter());
575 return null;
576 }
577 else
578 {
579 LOG.debug("Created {}", stream);
580 notifyStreamCreated(stream);
581 return stream;
582 }
583 }
584
585 private void notifyStreamCreated(IStream stream)
586 {
587 for (Listener listener : listeners)
588 {
589 if (listener instanceof StreamListener)
590 {
591 try
592 {
593 ((StreamListener)listener).onStreamCreated(stream);
594 }
595 catch (Exception x)
596 {
597 LOG.info("Exception while notifying listener " + listener, x);
598 }
599 catch (Error x)
600 {
601 LOG.info("Exception while notifying listener " + listener, x);
602 throw x;
603 }
604 }
605 }
606 }
607
608 private void removeStream(IStream stream)
609 {
610 if (stream.isUnidirectional())
611 stream.getAssociatedStream().disassociate(stream);
612
613 IStream removed = streams.remove(stream.getId());
614 if (removed != null)
615 {
616 assert removed == stream;
617
618 if (streamIds.get() % 2 == stream.getId() % 2)
619 localStreamCount.decrementAndGet();
620
621 LOG.debug("Removed {}", stream);
622 notifyStreamClosed(stream);
623 }
624 }
625
626 private void notifyStreamClosed(IStream stream)
627 {
628 for (Listener listener : listeners)
629 {
630 if (listener instanceof StreamListener)
631 {
632 try
633 {
634 ((StreamListener)listener).onStreamClosed(stream);
635 }
636 catch (Exception x)
637 {
638 LOG.info("Exception while notifying listener " + listener, x);
639 }
640 catch (Error x)
641 {
642 LOG.info("Exception while notifying listener " + listener, x);
643 throw x;
644 }
645 }
646 }
647 }
648
649 private void onReply(SynReplyFrame frame)
650 {
651 int streamId = frame.getStreamId();
652 IStream stream = streams.get(streamId);
653 if (stream == null)
654 {
655 RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM);
656 LOG.debug("Unknown stream {}", rstInfo);
657 rst(rstInfo, new Callback.Adapter());
658 }
659 else
660 {
661 processReply(stream, frame);
662 }
663 }
664
665 private void processReply(IStream stream, SynReplyFrame frame)
666 {
667 stream.process(frame);
668 if (stream.isClosed())
669 removeStream(stream);
670 }
671
672 private void onRst(RstStreamFrame frame)
673 {
674 IStream stream = streams.get(frame.getStreamId());
675
676 if (stream != null)
677 stream.process(frame);
678
679 RstInfo rstInfo = new RstInfo(frame.getStreamId(), StreamStatus.from(frame.getVersion(), frame.getStatusCode()));
680 notifyOnRst(listener, rstInfo);
681
682 if (stream != null)
683 removeStream(stream);
684 }
685
686 private void onSettings(SettingsFrame frame)
687 {
688 Settings.Setting windowSizeSetting = frame.getSettings().get(Settings.ID.INITIAL_WINDOW_SIZE);
689 if (windowSizeSetting != null)
690 {
691 int windowSize = windowSizeSetting.value();
692 setWindowSize(windowSize);
693 LOG.debug("Updated session window size to {}", windowSize);
694 }
695 Settings.Setting maxConcurrentStreamsSetting = frame.getSettings().get(Settings.ID.MAX_CONCURRENT_STREAMS);
696 if (maxConcurrentStreamsSetting != null)
697 {
698 int maxConcurrentStreamsValue = maxConcurrentStreamsSetting.value();
699 maxConcurrentLocalStreams = maxConcurrentStreamsValue;
700 LOG.debug("Updated session maxConcurrentLocalStreams to {}", maxConcurrentStreamsValue);
701 }
702 SettingsInfo settingsInfo = new SettingsInfo(frame.getSettings(), frame.isClearPersisted());
703 notifyOnSettings(listener, settingsInfo);
704 }
705
706 private void onPing(PingFrame frame)
707 {
708 int pingId = frame.getPingId();
709 if (pingId % 2 == pingIds.get() % 2)
710 {
711 PingResultInfo pingResultInfo = new PingResultInfo(frame.getPingId());
712 notifyOnPing(listener, pingResultInfo);
713 }
714 else
715 {
716 control(null, frame, 0, TimeUnit.MILLISECONDS, new Callback.Adapter());
717 }
718 }
719
720 private void onGoAway(GoAwayFrame frame)
721 {
722 if (goAwayReceived.compareAndSet(false, true))
723 {
724
725 GoAwayResultInfo goAwayResultInfo = new GoAwayResultInfo(frame.getLastStreamId(), SessionStatus.from(frame.getStatusCode()));
726 notifyOnGoAway(listener, goAwayResultInfo);
727
728
729
730 }
731 }
732
733 private void onHeaders(HeadersFrame frame)
734 {
735 int streamId = frame.getStreamId();
736 IStream stream = streams.get(streamId);
737 if (stream == null)
738 {
739 RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM);
740 LOG.debug("Unknown stream, {}", rstInfo);
741 rst(rstInfo, new Callback.Adapter());
742 }
743 else
744 {
745 processHeaders(stream, frame);
746 }
747 }
748
749 private void processHeaders(IStream stream, HeadersFrame frame)
750 {
751 stream.process(frame);
752 if (stream.isClosed())
753 removeStream(stream);
754 }
755
756 private void onWindowUpdate(WindowUpdateFrame frame)
757 {
758 int streamId = frame.getStreamId();
759 IStream stream = streams.get(streamId);
760 flowControlStrategy.onWindowUpdate(this, stream, frame.getWindowDelta());
761 flusher.flush();
762 }
763
764 private void onCredential(CredentialFrame frame)
765 {
766 LOG.warn("{} frame not yet supported", frame.getType());
767 }
768
769 protected void close()
770 {
771
772 if (controller != null)
773 controller.close(false);
774 }
775
776 private void notifyOnException(SessionFrameListener listener, Throwable x)
777 {
778 try
779 {
780 if (listener != null)
781 {
782 LOG.debug("Invoking callback with {} on listener {}", x, listener);
783 listener.onFailure(this, x);
784 }
785 }
786 catch (Exception xx)
787 {
788 LOG.info("Exception while notifying listener " + listener, xx);
789 }
790 catch (Error xx)
791 {
792 LOG.info("Exception while notifying listener " + listener, xx);
793 throw xx;
794 }
795 }
796
797 private StreamFrameListener notifyOnPush(StreamFrameListener listener, Stream stream, PushInfo pushInfo)
798 {
799 try
800 {
801 if (listener == null)
802 return null;
803 LOG.debug("Invoking callback with {} on listener {}", pushInfo, listener);
804 return listener.onPush(stream, pushInfo);
805 }
806 catch (Exception x)
807 {
808 LOG.info("Exception while notifying listener " + listener, x);
809 return null;
810 }
811 catch (Error x)
812 {
813 LOG.info("Exception while notifying listener " + listener, x);
814 throw x;
815 }
816 }
817
818 private StreamFrameListener notifyOnSyn(SessionFrameListener listener, Stream stream, SynInfo synInfo)
819 {
820 try
821 {
822 if (listener == null)
823 return null;
824 LOG.debug("Invoking callback with {} on listener {}", synInfo, listener);
825 return listener.onSyn(stream, synInfo);
826 }
827 catch (Exception x)
828 {
829 LOG.info("Exception while notifying listener " + listener, x);
830 return null;
831 }
832 catch (Error x)
833 {
834 LOG.info("Exception while notifying listener " + listener, x);
835 throw x;
836 }
837 }
838
839 private void notifyOnRst(SessionFrameListener listener, RstInfo rstInfo)
840 {
841 try
842 {
843 if (listener != null)
844 {
845 LOG.debug("Invoking callback with {} on listener {}", rstInfo, listener);
846 listener.onRst(this, rstInfo);
847 }
848 }
849 catch (Exception x)
850 {
851 LOG.info("Exception while notifying listener " + listener, x);
852 }
853 catch (Error x)
854 {
855 LOG.info("Exception while notifying listener " + listener, x);
856 throw x;
857 }
858 }
859
860 private void notifyOnSettings(SessionFrameListener listener, SettingsInfo settingsInfo)
861 {
862 try
863 {
864 if (listener != null)
865 {
866 LOG.debug("Invoking callback with {} on listener {}", settingsInfo, listener);
867 listener.onSettings(this, settingsInfo);
868 }
869 }
870 catch (Exception x)
871 {
872 LOG.info("Exception while notifying listener " + listener, x);
873 }
874 catch (Error x)
875 {
876 LOG.info("Exception while notifying listener " + listener, x);
877 throw x;
878 }
879 }
880
881 private void notifyOnPing(SessionFrameListener listener, PingResultInfo pingResultInfo)
882 {
883 try
884 {
885 if (listener != null)
886 {
887 LOG.debug("Invoking callback with {} on listener {}", pingResultInfo, listener);
888 listener.onPing(this, pingResultInfo);
889 }
890 }
891 catch (Exception x)
892 {
893 LOG.info("Exception while notifying listener " + listener, x);
894 }
895 catch (Error x)
896 {
897 LOG.info("Exception while notifying listener " + listener, x);
898 throw x;
899 }
900 }
901
902 private void notifyOnGoAway(SessionFrameListener listener, GoAwayResultInfo goAwayResultInfo)
903 {
904 try
905 {
906 if (listener != null)
907 {
908 LOG.debug("Invoking callback with {} on listener {}", goAwayResultInfo, listener);
909 listener.onGoAway(this, goAwayResultInfo);
910 }
911 }
912 catch (Exception x)
913 {
914 LOG.info("Exception while notifying listener " + listener, x);
915 }
916 catch (Error x)
917 {
918 LOG.info("Exception while notifying listener " + listener, x);
919 throw x;
920 }
921 }
922
923 @Override
924 public void control(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Callback callback)
925 {
926 generateAndEnqueueControlFrame(stream, frame, timeout, unit, callback);
927 }
928
929 private void generateAndEnqueueControlFrame(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Callback callback)
930 {
931 try
932 {
933
934
935
936 synchronized (this)
937 {
938 ByteBuffer buffer = generator.control(frame);
939 LOG.debug("Queuing {} on {}", frame, stream);
940 ControlFrameBytes frameBytes = new ControlFrameBytes(stream, callback, frame, buffer);
941 if (timeout > 0)
942 frameBytes.task = scheduler.schedule(frameBytes, timeout, unit);
943
944
945 if (ControlFrameType.PING == frame.getType())
946 flusher.prepend(frameBytes);
947 else
948 flusher.append(frameBytes);
949 }
950 }
951 catch (Exception x)
952 {
953 notifyCallbackFailed(callback, x);
954 }
955 }
956
957 private void updateLastStreamId(IStream stream)
958 {
959 int streamId = stream.getId();
960 if (streamId % 2 != streamIds.get() % 2)
961 Atomics.updateMax(lastStreamId, streamId);
962 }
963
964 @Override
965 public void data(IStream stream, DataInfo dataInfo, long timeout, TimeUnit unit, Callback callback)
966 {
967 LOG.debug("Queuing {} on {}", dataInfo, stream);
968 DataFrameBytes frameBytes = new DataFrameBytes(stream, callback, dataInfo);
969 if (timeout > 0)
970 frameBytes.task = scheduler.schedule(frameBytes, timeout, unit);
971 flusher.append(frameBytes);
972 }
973
974 @Override
975 public void shutdown()
976 {
977 FrameBytes frameBytes = new CloseFrameBytes();
978 flusher.append(frameBytes);
979 }
980
981 private void complete(final Callback callback)
982 {
983 callback.succeeded();
984 }
985
986 private void notifyCallbackFailed(Callback callback, Throwable x)
987 {
988 try
989 {
990 if (callback != null)
991 callback.failed(x);
992 }
993 catch (Exception xx)
994 {
995 LOG.info("Exception while notifying callback " + callback, xx);
996 }
997 catch (Error xx)
998 {
999 LOG.info("Exception while notifying callback " + callback, xx);
1000 throw xx;
1001 }
1002 }
1003
1004 public int getWindowSize()
1005 {
1006 return flowControlStrategy.getWindowSize(this);
1007 }
1008
1009 public void setWindowSize(int initialWindowSize)
1010 {
1011 flowControlStrategy.setWindowSize(this, initialWindowSize);
1012 }
1013
1014 @Override
1015 public String toString()
1016 {
1017 return String.format("%s@%x{v%d,queueSize=%d,windowSize=%d,streams=%d}", getClass().getSimpleName(),
1018 hashCode(), version, flusher.getQueueSize(), getWindowSize(), streams.size());
1019 }
1020
1021 @Override
1022 public String dump()
1023 {
1024 return ContainerLifeCycle.dump(this);
1025 }
1026
1027 @Override
1028 public void dump(Appendable out, String indent) throws IOException
1029 {
1030 ContainerLifeCycle.dumpObject(out, this);
1031 ContainerLifeCycle.dump(out, indent, Collections.singletonList(controller), streams.values());
1032 }
1033
1034 public interface FrameBytes extends Comparable<FrameBytes>, Callback
1035 {
1036 public IStream getStream();
1037
1038 public abstract ByteBuffer getByteBuffer();
1039 }
1040
1041 abstract class AbstractFrameBytes implements FrameBytes, Runnable
1042 {
1043 private final IStream stream;
1044 private final Callback callback;
1045 protected volatile Scheduler.Task task;
1046
1047 protected AbstractFrameBytes(IStream stream, Callback callback)
1048 {
1049 this.stream = stream;
1050 this.callback = Objects.requireNonNull(callback);
1051 }
1052
1053 @Override
1054 public IStream getStream()
1055 {
1056 return stream;
1057 }
1058
1059 @Override
1060 public int compareTo(FrameBytes that)
1061 {
1062
1063
1064 IStream thisStream = getStream();
1065 IStream thatStream = that.getStream();
1066 if (thisStream == null)
1067 return thatStream == null ? 0 : -1;
1068 if (thatStream == null)
1069 return 1;
1070
1071 return thatStream.getPriority() - thisStream.getPriority();
1072 }
1073
1074 private void cancelTask()
1075 {
1076 Scheduler.Task task = this.task;
1077 if (task != null)
1078 task.cancel();
1079 }
1080
1081 @Override
1082 public void run()
1083 {
1084 close();
1085 failed(new InterruptedByTimeoutException());
1086 }
1087
1088 @Override
1089 public void succeeded()
1090 {
1091 cancelTask();
1092 StandardSession.this.complete(callback);
1093 }
1094
1095 @Override
1096 public void failed(Throwable x)
1097 {
1098 cancelTask();
1099 notifyCallbackFailed(callback, x);
1100 }
1101 }
1102
1103 class ControlFrameBytes extends AbstractFrameBytes
1104 {
1105 private final ControlFrame frame;
1106 private final ByteBuffer buffer;
1107
1108 private ControlFrameBytes(IStream stream, Callback callback, ControlFrame frame, ByteBuffer buffer)
1109 {
1110 super(stream, callback);
1111 this.frame = frame;
1112 this.buffer = buffer;
1113 }
1114
1115 @Override
1116 public ByteBuffer getByteBuffer()
1117 {
1118 return buffer;
1119 }
1120
1121 @Override
1122 public void succeeded()
1123 {
1124 bufferPool.release(buffer);
1125
1126 super.succeeded();
1127
1128 if (frame.getType() == ControlFrameType.GO_AWAY)
1129 {
1130
1131
1132 close();
1133 }
1134 IStream stream = getStream();
1135 if (stream != null && stream.isClosed())
1136 removeStream(stream);
1137 }
1138
1139 @Override
1140 public String toString()
1141 {
1142 return frame.toString();
1143 }
1144 }
1145
1146 private class DataFrameBytes extends AbstractFrameBytes
1147 {
1148 private final DataInfo dataInfo;
1149 private int size;
1150 private volatile ByteBuffer buffer;
1151
1152 private DataFrameBytes(IStream stream, Callback handler, DataInfo dataInfo)
1153 {
1154 super(stream, handler);
1155 this.dataInfo = dataInfo;
1156 }
1157
1158 @Override
1159 public ByteBuffer getByteBuffer()
1160 {
1161 try
1162 {
1163 IStream stream = getStream();
1164 int windowSize = stream.getWindowSize();
1165 if (windowSize <= 0)
1166 return null;
1167
1168 size = dataInfo.available();
1169 if (size > windowSize)
1170 size = windowSize;
1171
1172 buffer = generator.data(stream.getId(), size, dataInfo);
1173 return buffer;
1174 }
1175 catch (Throwable x)
1176 {
1177 failed(x);
1178 return null;
1179 }
1180 }
1181
1182 @Override
1183 public void succeeded()
1184 {
1185 bufferPool.release(buffer);
1186 IStream stream = getStream();
1187 dataInfo.consume(size);
1188 flowControlStrategy.updateWindow(StandardSession.this, stream, -size);
1189 if (dataInfo.available() > 0)
1190 {
1191
1192
1193
1194 flusher.prepend(this);
1195 }
1196 else
1197 {
1198 super.succeeded();
1199 stream.updateCloseState(dataInfo.isClose(), true);
1200 if (stream.isClosed())
1201 removeStream(stream);
1202 }
1203 }
1204
1205 @Override
1206 public String toString()
1207 {
1208 return String.format("DATA bytes @%x available=%d consumed=%d on %s", dataInfo.hashCode(), dataInfo.available(), dataInfo.consumed(), getStream());
1209 }
1210 }
1211
1212 private class CloseFrameBytes extends AbstractFrameBytes
1213 {
1214 private CloseFrameBytes()
1215 {
1216 super(null, new Callback.Adapter());
1217 }
1218
1219 @Override
1220 public ByteBuffer getByteBuffer()
1221 {
1222 return BufferUtil.EMPTY_BUFFER;
1223 }
1224
1225 @Override
1226 public void succeeded()
1227 {
1228 super.succeeded();
1229 close();
1230 }
1231 }
1232
1233 private static class PingInfoCallback extends PingResultInfo implements Callback
1234 {
1235 private final Promise<PingResultInfo> promise;
1236
1237 public PingInfoCallback(int pingId, Promise<PingResultInfo> promise)
1238 {
1239 super(pingId);
1240 this.promise = promise;
1241 }
1242
1243 @Override
1244 public void succeeded()
1245 {
1246 if (promise != null)
1247 promise.succeeded(this);
1248 }
1249
1250 @Override
1251 public void failed(Throwable x)
1252 {
1253 if (promise != null)
1254 promise.failed(x);
1255 }
1256 }
1257 }