1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.spdy;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.InterruptedByTimeoutException;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.HashSet;
27 import java.util.LinkedList;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Set;
31 import java.util.concurrent.ConcurrentHashMap;
32 import java.util.concurrent.ConcurrentMap;
33 import java.util.concurrent.CopyOnWriteArrayList;
34 import java.util.concurrent.Executor;
35 import java.util.concurrent.Future;
36 import java.util.concurrent.ScheduledExecutorService;
37 import java.util.concurrent.ScheduledFuture;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.concurrent.atomic.AtomicInteger;
41
42 import org.eclipse.jetty.spdy.api.ByteBufferDataInfo;
43 import org.eclipse.jetty.spdy.api.DataInfo;
44 import org.eclipse.jetty.spdy.api.GoAwayInfo;
45 import org.eclipse.jetty.spdy.api.Handler;
46 import org.eclipse.jetty.spdy.api.PingInfo;
47 import org.eclipse.jetty.spdy.api.RstInfo;
48 import org.eclipse.jetty.spdy.api.SPDYException;
49 import org.eclipse.jetty.spdy.api.Session;
50 import org.eclipse.jetty.spdy.api.SessionFrameListener;
51 import org.eclipse.jetty.spdy.api.SessionStatus;
52 import org.eclipse.jetty.spdy.api.Settings;
53 import org.eclipse.jetty.spdy.api.SettingsInfo;
54 import org.eclipse.jetty.spdy.api.Stream;
55 import org.eclipse.jetty.spdy.api.StreamFrameListener;
56 import org.eclipse.jetty.spdy.api.StreamStatus;
57 import org.eclipse.jetty.spdy.api.SynInfo;
58 import org.eclipse.jetty.spdy.frames.ControlFrame;
59 import org.eclipse.jetty.spdy.frames.ControlFrameType;
60 import org.eclipse.jetty.spdy.frames.CredentialFrame;
61 import org.eclipse.jetty.spdy.frames.DataFrame;
62 import org.eclipse.jetty.spdy.frames.GoAwayFrame;
63 import org.eclipse.jetty.spdy.frames.HeadersFrame;
64 import org.eclipse.jetty.spdy.frames.PingFrame;
65 import org.eclipse.jetty.spdy.frames.RstStreamFrame;
66 import org.eclipse.jetty.spdy.frames.SettingsFrame;
67 import org.eclipse.jetty.spdy.frames.SynReplyFrame;
68 import org.eclipse.jetty.spdy.frames.SynStreamFrame;
69 import org.eclipse.jetty.spdy.frames.WindowUpdateFrame;
70 import org.eclipse.jetty.spdy.generator.Generator;
71 import org.eclipse.jetty.spdy.parser.Parser;
72 import org.eclipse.jetty.util.Atomics;
73 import org.eclipse.jetty.util.component.AggregateLifeCycle;
74 import org.eclipse.jetty.util.component.Dumpable;
75 import org.eclipse.jetty.util.log.Log;
76 import org.eclipse.jetty.util.log.Logger;
77
78 public class StandardSession implements ISession, Parser.Listener, Handler<StandardSession.FrameBytes>, Dumpable
79 {
80 private static final Logger logger = Log.getLogger(Session.class);
81 private static final ThreadLocal<Integer> handlerInvocations = new ThreadLocal<Integer>()
82 {
83 @Override
84 protected Integer initialValue()
85 {
86 return 0;
87 }
88 };
89
90 private final Map<String, Object> attributes = new ConcurrentHashMap<>();
91 private final List<Listener> listeners = new CopyOnWriteArrayList<>();
92 private final ConcurrentMap<Integer, IStream> streams = new ConcurrentHashMap<>();
93 private final LinkedList<FrameBytes> queue = new LinkedList<>();
94 private final ByteBufferPool bufferPool;
95 private final Executor threadPool;
96 private final ScheduledExecutorService scheduler;
97 private final short version;
98 private final Controller<FrameBytes> controller;
99 private final IdleListener idleListener;
100 private final AtomicInteger streamIds;
101 private final AtomicInteger pingIds;
102 private final SessionFrameListener listener;
103 private final Generator generator;
104 private final AtomicBoolean goAwaySent = new AtomicBoolean();
105 private final AtomicBoolean goAwayReceived = new AtomicBoolean();
106 private final AtomicInteger lastStreamId = new AtomicInteger();
107 private final FlowControlStrategy flowControlStrategy;
108 private boolean flushing;
109 private Throwable failure;
110
111 public StandardSession(short version, ByteBufferPool bufferPool, Executor threadPool, ScheduledExecutorService scheduler,
112 Controller<FrameBytes> controller, IdleListener idleListener, int initialStreamId, SessionFrameListener listener,
113 Generator generator, FlowControlStrategy flowControlStrategy)
114 {
115 this.version = version;
116 this.bufferPool = bufferPool;
117 this.threadPool = threadPool;
118 this.scheduler = scheduler;
119 this.controller = controller;
120 this.idleListener = idleListener;
121 this.streamIds = new AtomicInteger(initialStreamId);
122 this.pingIds = new AtomicInteger(initialStreamId);
123 this.listener = listener;
124 this.generator = generator;
125 this.flowControlStrategy = flowControlStrategy;
126 }
127
128 @Override
129 public short getVersion()
130 {
131 return version;
132 }
133
134 @Override
135 public void addListener(Listener listener)
136 {
137 listeners.add(listener);
138 }
139
140 @Override
141 public void removeListener(Listener listener)
142 {
143 listeners.remove(listener);
144 }
145
146 @Override
147 public Future<Stream> syn(SynInfo synInfo, StreamFrameListener listener)
148 {
149 Promise<Stream> result = new Promise<>();
150 syn(synInfo,listener,0,TimeUnit.MILLISECONDS,result);
151 return result;
152 }
153
154 @Override
155 public void syn(SynInfo synInfo, StreamFrameListener listener, long timeout, TimeUnit unit, Handler<Stream> handler)
156 {
157
158
159
160
161
162
163 int associatedStreamId = 0;
164 if (synInfo instanceof PushSynInfo)
165 associatedStreamId = ((PushSynInfo)synInfo).getAssociatedStreamId();
166
167 synchronized (this)
168 {
169 int streamId = streamIds.getAndAdd(2);
170
171 SynStreamFrame synStream = new SynStreamFrame(version, synInfo.getFlags(), streamId, associatedStreamId, synInfo.getPriority(), (short)0, synInfo.getHeaders());
172 IStream stream = createStream(synStream, listener, true);
173 generateAndEnqueueControlFrame(stream, synStream, timeout, unit, handler, stream);
174 }
175 flush();
176 }
177
178 @Override
179 public Future<Void> rst(RstInfo rstInfo)
180 {
181 Promise<Void> result = new Promise<>();
182 rst(rstInfo,0,TimeUnit.MILLISECONDS,result);
183 return result;
184 }
185
186 @Override
187 public void rst(RstInfo rstInfo, long timeout, TimeUnit unit, Handler<Void> handler)
188 {
189
190 if (goAwaySent.get())
191 {
192 complete(handler,null);
193 }
194 else
195 {
196 int streamId = rstInfo.getStreamId();
197 IStream stream = streams.get(streamId);
198 RstStreamFrame frame = new RstStreamFrame(version,streamId,rstInfo.getStreamStatus().getCode(version));
199 control(stream,frame,timeout,unit,handler,null);
200 if (stream != null)
201 {
202 stream.process(frame);
203 removeStream(stream);
204 }
205 }
206 }
207
208 @Override
209 public Future<Void> settings(SettingsInfo settingsInfo)
210 {
211 Promise<Void> result = new Promise<>();
212 settings(settingsInfo,0,TimeUnit.MILLISECONDS,result);
213 return result;
214 }
215
216 @Override
217 public void settings(SettingsInfo settingsInfo, long timeout, TimeUnit unit, Handler<Void> handler)
218 {
219 SettingsFrame frame = new SettingsFrame(version,settingsInfo.getFlags(),settingsInfo.getSettings());
220 control(null, frame, timeout, unit, handler, null);
221 }
222
223 @Override
224 public Future<PingInfo> ping()
225 {
226 Promise<PingInfo> result = new Promise<>();
227 ping(0, TimeUnit.MILLISECONDS, result);
228 return result;
229 }
230
231 @Override
232 public void ping(long timeout, TimeUnit unit, Handler<PingInfo> handler)
233 {
234 int pingId = pingIds.getAndAdd(2);
235 PingInfo pingInfo = new PingInfo(pingId);
236 PingFrame frame = new PingFrame(version,pingId);
237 control(null,frame,timeout,unit,handler,pingInfo);
238 }
239
240 @Override
241 public Future<Void> goAway()
242 {
243 return goAway(SessionStatus.OK);
244 }
245
246 private Future<Void> goAway(SessionStatus sessionStatus)
247 {
248 Promise<Void> result = new Promise<>();
249 goAway(sessionStatus, 0, TimeUnit.MILLISECONDS, result);
250 return result;
251 }
252
253 @Override
254 public void goAway(long timeout, TimeUnit unit, Handler<Void> handler)
255 {
256 goAway(SessionStatus.OK, timeout, unit, handler);
257 }
258
259 private void goAway(SessionStatus sessionStatus, long timeout, TimeUnit unit, Handler<Void> handler)
260 {
261 if (goAwaySent.compareAndSet(false,true))
262 {
263 if (!goAwayReceived.get())
264 {
265 GoAwayFrame frame = new GoAwayFrame(version,lastStreamId.get(),sessionStatus.getCode());
266 control(null,frame,timeout,unit,handler,null);
267 return;
268 }
269 }
270 complete(handler, null);
271 }
272
273 @Override
274 public Set<Stream> getStreams()
275 {
276 Set<Stream> result = new HashSet<>();
277 result.addAll(streams.values());
278 return result;
279 }
280
281 @Override
282 public IStream getStream(int streamId)
283 {
284 return streams.get(streamId);
285 }
286
287 @Override
288 public Object getAttribute(String key)
289 {
290 return attributes.get(key);
291 }
292
293 @Override
294 public void setAttribute(String key, Object value)
295 {
296 attributes.put(key, value);
297 }
298
299 @Override
300 public Object removeAttribute(String key)
301 {
302 return attributes.remove(key);
303 }
304
305 @Override
306 public void onControlFrame(ControlFrame frame)
307 {
308 notifyIdle(idleListener, false);
309 try
310 {
311 logger.debug("Processing {}", frame);
312
313 if (goAwaySent.get())
314 {
315 logger.debug("Skipped processing of {}", frame);
316 return;
317 }
318
319 switch (frame.getType())
320 {
321 case SYN_STREAM:
322 {
323 onSyn((SynStreamFrame)frame);
324 break;
325 }
326 case SYN_REPLY:
327 {
328 onReply((SynReplyFrame)frame);
329 break;
330 }
331 case RST_STREAM:
332 {
333 onRst((RstStreamFrame)frame);
334 break;
335 }
336 case SETTINGS:
337 {
338 onSettings((SettingsFrame)frame);
339 break;
340 }
341 case NOOP:
342 {
343
344 break;
345 }
346 case PING:
347 {
348 onPing((PingFrame)frame);
349 break;
350 }
351 case GO_AWAY:
352 {
353 onGoAway((GoAwayFrame)frame);
354 break;
355 }
356 case HEADERS:
357 {
358 onHeaders((HeadersFrame)frame);
359 break;
360 }
361 case WINDOW_UPDATE:
362 {
363 onWindowUpdate((WindowUpdateFrame)frame);
364 break;
365 }
366 case CREDENTIAL:
367 {
368 onCredential((CredentialFrame)frame);
369 break;
370 }
371 default:
372 {
373 throw new IllegalStateException();
374 }
375 }
376 }
377 finally
378 {
379 notifyIdle(idleListener, true);
380 }
381 }
382
383 @Override
384 public void onDataFrame(DataFrame frame, ByteBuffer data)
385 {
386 notifyIdle(idleListener, false);
387 try
388 {
389 logger.debug("Processing {}, {} data bytes", frame, data.remaining());
390
391 if (goAwaySent.get())
392 {
393 logger.debug("Skipped processing of {}", frame);
394 return;
395 }
396
397 int streamId = frame.getStreamId();
398 IStream stream = streams.get(streamId);
399 if (stream == null)
400 {
401 RstInfo rstInfo = new RstInfo(streamId, StreamStatus.INVALID_STREAM);
402 logger.debug("Unknown stream {}", rstInfo);
403 rst(rstInfo);
404 }
405 else
406 {
407 processData(stream, frame, data);
408 }
409 }
410 finally
411 {
412 notifyIdle(idleListener, true);
413 }
414 }
415
416 private void notifyIdle(IdleListener listener, boolean idle)
417 {
418 if (listener != null)
419 listener.onIdle(idle);
420 }
421
422 private void processData(final IStream stream, DataFrame frame, ByteBuffer data)
423 {
424 ByteBufferDataInfo dataInfo = new ByteBufferDataInfo(data, frame.isClose(), frame.isCompress())
425 {
426 @Override
427 public void consume(int delta)
428 {
429 super.consume(delta);
430 flowControlStrategy.onDataConsumed(StandardSession.this, stream, this, delta);
431 }
432 };
433 flowControlStrategy.onDataReceived(this, stream, dataInfo);
434 stream.process(dataInfo);
435 if (stream.isClosed())
436 removeStream(stream);
437 }
438
439 @Override
440 public void onStreamException(StreamException x)
441 {
442 notifyOnException(listener, x);
443 rst(new RstInfo(x.getStreamId(),x.getStreamStatus()));
444 }
445
446 @Override
447 public void onSessionException(SessionException x)
448 {
449 Throwable cause = x.getCause();
450 notifyOnException(listener,cause == null?x:cause);
451 goAway(x.getSessionStatus());
452 }
453
454 private void onSyn(SynStreamFrame frame)
455 {
456 IStream stream = createStream(frame, null, false);
457 if (stream != null)
458 processSyn(listener, stream, frame);
459 }
460
461 private void processSyn(SessionFrameListener listener, IStream stream, SynStreamFrame frame)
462 {
463 stream.process(frame);
464
465 updateLastStreamId(stream);
466 SynInfo synInfo = new SynInfo(frame.getHeaders(),frame.isClose(),frame.getPriority());
467 StreamFrameListener streamListener = notifyOnSyn(listener,stream,synInfo);
468 stream.setStreamFrameListener(streamListener);
469 flush();
470
471 if (stream.isClosed())
472 removeStream(stream);
473 }
474
475 private IStream createStream(SynStreamFrame frame, StreamFrameListener listener, boolean local)
476 {
477 IStream stream = newStream(frame);
478 stream.updateCloseState(frame.isClose(), local);
479 stream.setStreamFrameListener(listener);
480
481 if (stream.isUnidirectional())
482 {
483
484 stream.updateCloseState(true, !local);
485 if (!stream.isClosed())
486 stream.getAssociatedStream().associate(stream);
487 }
488
489 int streamId = stream.getId();
490 if (streams.putIfAbsent(streamId, stream) != null)
491 {
492 if (local)
493 throw new IllegalStateException("Duplicate stream id " + streamId);
494 RstInfo rstInfo = new RstInfo(streamId, StreamStatus.PROTOCOL_ERROR);
495 logger.debug("Duplicate stream, {}", rstInfo);
496 rst(rstInfo);
497 return null;
498 }
499 else
500 {
501 logger.debug("Created {}", stream);
502 if (local)
503 notifyStreamCreated(stream);
504 return stream;
505 }
506 }
507
508 private IStream newStream(SynStreamFrame frame)
509 {
510 IStream associatedStream = streams.get(frame.getAssociatedStreamId());
511 IStream stream = new StandardStream(frame.getStreamId(), frame.getPriority(), this, associatedStream);
512 flowControlStrategy.onNewStream(this, stream);
513 return stream;
514 }
515
516 private void notifyStreamCreated(IStream stream)
517 {
518 for (Listener listener : listeners)
519 {
520 if (listener instanceof StreamListener)
521 {
522 try
523 {
524 ((StreamListener)listener).onStreamCreated(stream);
525 }
526 catch (Exception x)
527 {
528 logger.info("Exception while notifying listener " + listener, x);
529 }
530 catch (Error x)
531 {
532 logger.info("Exception while notifying listener " + listener, x);
533 throw x;
534 }
535 }
536 }
537 }
538
539 private void removeStream(IStream stream)
540 {
541 if (stream.isUnidirectional())
542 stream.getAssociatedStream().disassociate(stream);
543
544 IStream removed = streams.remove(stream.getId());
545 if (removed != null)
546 assert removed == stream;
547
548 logger.debug("Removed {}", stream);
549 notifyStreamClosed(stream);
550 }
551
552 private void notifyStreamClosed(IStream stream)
553 {
554 for (Listener listener : listeners)
555 {
556 if (listener instanceof StreamListener)
557 {
558 try
559 {
560 ((StreamListener)listener).onStreamClosed(stream);
561 }
562 catch (Exception x)
563 {
564 logger.info("Exception while notifying listener " + listener, x);
565 }
566 catch (Error x)
567 {
568 logger.info("Exception while notifying listener " + listener, x);
569 throw x;
570 }
571 }
572 }
573 }
574
575 private void onReply(SynReplyFrame frame)
576 {
577 int streamId = frame.getStreamId();
578 IStream stream = streams.get(streamId);
579 if (stream == null)
580 {
581 RstInfo rstInfo = new RstInfo(streamId,StreamStatus.INVALID_STREAM);
582 logger.debug("Unknown stream {}",rstInfo);
583 rst(rstInfo);
584 }
585 else
586 {
587 processReply(stream,frame);
588 }
589 }
590
591 private void processReply(IStream stream, SynReplyFrame frame)
592 {
593 stream.process(frame);
594 if (stream.isClosed())
595 removeStream(stream);
596 }
597
598 private void onRst(RstStreamFrame frame)
599 {
600 IStream stream = streams.get(frame.getStreamId());
601
602 if (stream != null)
603 stream.process(frame);
604
605 RstInfo rstInfo = new RstInfo(frame.getStreamId(),StreamStatus.from(frame.getVersion(),frame.getStatusCode()));
606 notifyOnRst(listener, rstInfo);
607 flush();
608
609 if (stream != null)
610 removeStream(stream);
611 }
612
613 private void onSettings(SettingsFrame frame)
614 {
615 Settings.Setting windowSizeSetting = frame.getSettings().get(Settings.ID.INITIAL_WINDOW_SIZE);
616 if (windowSizeSetting != null)
617 {
618 int windowSize = windowSizeSetting.value();
619 setWindowSize(windowSize);
620 logger.debug("Updated session window size to {}", windowSize);
621 }
622 SettingsInfo settingsInfo = new SettingsInfo(frame.getSettings(), frame.isClearPersisted());
623 notifyOnSettings(listener, settingsInfo);
624 flush();
625 }
626
627 private void onPing(PingFrame frame)
628 {
629 int pingId = frame.getPingId();
630 if (pingId % 2 == pingIds.get() % 2)
631 {
632 PingInfo pingInfo = new PingInfo(frame.getPingId());
633 notifyOnPing(listener, pingInfo);
634 flush();
635 }
636 else
637 {
638 control(null, frame, 0, TimeUnit.MILLISECONDS, null, null);
639 }
640 }
641
642 private void onGoAway(GoAwayFrame frame)
643 {
644 if (goAwayReceived.compareAndSet(false, true))
645 {
646 GoAwayInfo goAwayInfo = new GoAwayInfo(frame.getLastStreamId(),SessionStatus.from(frame.getStatusCode()));
647 notifyOnGoAway(listener,goAwayInfo);
648 flush();
649
650
651
652 close();
653 }
654 }
655
656 private void onHeaders(HeadersFrame frame)
657 {
658 int streamId = frame.getStreamId();
659 IStream stream = streams.get(streamId);
660 if (stream == null)
661 {
662 RstInfo rstInfo = new RstInfo(streamId,StreamStatus.INVALID_STREAM);
663 logger.debug("Unknown stream, {}",rstInfo);
664 rst(rstInfo);
665 }
666 else
667 {
668 processHeaders(stream,frame);
669 }
670 }
671
672 private void processHeaders(IStream stream, HeadersFrame frame)
673 {
674 stream.process(frame);
675 if (stream.isClosed())
676 removeStream(stream);
677 }
678
679 private void onWindowUpdate(WindowUpdateFrame frame)
680 {
681 int streamId = frame.getStreamId();
682 IStream stream = streams.get(streamId);
683 flowControlStrategy.onWindowUpdate(this, stream, frame.getWindowDelta());
684 flush();
685 }
686
687 private void onCredential(CredentialFrame frame)
688 {
689 logger.warn("{} frame not yet supported", ControlFrameType.CREDENTIAL);
690 flush();
691 }
692
693 protected void close()
694 {
695
696 if (controller != null)
697 controller.close(false);
698 }
699
700 private void notifyOnException(SessionFrameListener listener, Throwable x)
701 {
702 try
703 {
704 if (listener != null)
705 {
706 logger.debug("Invoking callback with {} on listener {}",x,listener);
707 listener.onException(x);
708 }
709 }
710 catch (Exception xx)
711 {
712 logger.info("Exception while notifying listener " + listener, xx);
713 }
714 catch (Error xx)
715 {
716 logger.info("Exception while notifying listener " + listener, xx);
717 throw xx;
718 }
719 }
720
721 private StreamFrameListener notifyOnSyn(SessionFrameListener listener, Stream stream, SynInfo synInfo)
722 {
723 try
724 {
725 if (listener == null)
726 return null;
727 logger.debug("Invoking callback with {} on listener {}",synInfo,listener);
728 return listener.onSyn(stream,synInfo);
729 }
730 catch (Exception x)
731 {
732 logger.info("Exception while notifying listener " + listener,x);
733 return null;
734 }
735 catch (Error x)
736 {
737 logger.info("Exception while notifying listener " + listener, x);
738 throw x;
739 }
740 }
741
742 private void notifyOnRst(SessionFrameListener listener, RstInfo rstInfo)
743 {
744 try
745 {
746 if (listener != null)
747 {
748 logger.debug("Invoking callback with {} on listener {}",rstInfo,listener);
749 listener.onRst(this,rstInfo);
750 }
751 }
752 catch (Exception x)
753 {
754 logger.info("Exception while notifying listener " + listener, x);
755 }
756 catch (Error x)
757 {
758 logger.info("Exception while notifying listener " + listener, x);
759 throw x;
760 }
761 }
762
763 private void notifyOnSettings(SessionFrameListener listener, SettingsInfo settingsInfo)
764 {
765 try
766 {
767 if (listener != null)
768 {
769 logger.debug("Invoking callback with {} on listener {}",settingsInfo,listener);
770 listener.onSettings(this, settingsInfo);
771 }
772 }
773 catch (Exception x)
774 {
775 logger.info("Exception while notifying listener " + listener, x);
776 }
777 catch (Error x)
778 {
779 logger.info("Exception while notifying listener " + listener, x);
780 throw x;
781 }
782 }
783
784 private void notifyOnPing(SessionFrameListener listener, PingInfo pingInfo)
785 {
786 try
787 {
788 if (listener != null)
789 {
790 logger.debug("Invoking callback with {} on listener {}",pingInfo,listener);
791 listener.onPing(this, pingInfo);
792 }
793 }
794 catch (Exception x)
795 {
796 logger.info("Exception while notifying listener " + listener, x);
797 }
798 catch (Error x)
799 {
800 logger.info("Exception while notifying listener " + listener, x);
801 throw x;
802 }
803 }
804
805 private void notifyOnGoAway(SessionFrameListener listener, GoAwayInfo goAwayInfo)
806 {
807 try
808 {
809 if (listener != null)
810 {
811 logger.debug("Invoking callback with {} on listener {}",goAwayInfo,listener);
812 listener.onGoAway(this, goAwayInfo);
813 }
814 }
815 catch (Exception x)
816 {
817 logger.info("Exception while notifying listener " + listener, x);
818 }
819 catch (Error x)
820 {
821 logger.info("Exception while notifying listener " + listener, x);
822 throw x;
823 }
824 }
825
826 @Override
827 public <C> void control(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Handler<C> handler, C context)
828 {
829 generateAndEnqueueControlFrame(stream,frame,timeout,unit,handler,context);
830 flush();
831 }
832
833 private <C> void generateAndEnqueueControlFrame(IStream stream, ControlFrame frame, long timeout, TimeUnit unit, Handler<C> handler, C context)
834 {
835 try
836 {
837
838
839
840 synchronized (this)
841 {
842 ByteBuffer buffer = generator.control(frame);
843 logger.debug("Queuing {} on {}", frame, stream);
844 ControlFrameBytes<C> frameBytes = new ControlFrameBytes<>(stream, handler, context, frame, buffer);
845 if (timeout > 0)
846 frameBytes.task = scheduler.schedule(frameBytes, timeout, unit);
847
848
849 if (ControlFrameType.PING == frame.getType())
850 prepend(frameBytes);
851 else
852 append(frameBytes);
853 }
854 }
855 catch (Exception x)
856 {
857 notifyHandlerFailed(handler, context, x);
858 }
859 }
860
861 private void updateLastStreamId(IStream stream)
862 {
863 int streamId = stream.getId();
864 if (streamId % 2 != streamIds.get() % 2)
865 Atomics.updateMax(lastStreamId, streamId);
866 }
867
868 @Override
869 public <C> void data(IStream stream, DataInfo dataInfo, long timeout, TimeUnit unit, Handler<C> handler, C context)
870 {
871 logger.debug("Queuing {} on {}",dataInfo,stream);
872 DataFrameBytes<C> frameBytes = new DataFrameBytes<>(stream,handler,context,dataInfo);
873 if (timeout > 0)
874 frameBytes.task = scheduler.schedule(frameBytes,timeout,unit);
875 append(frameBytes);
876 flush();
877 }
878
879 private void execute(Runnable task)
880 {
881 threadPool.execute(task);
882 }
883
884 @Override
885 public void flush()
886 {
887 FrameBytes frameBytes = null;
888 ByteBuffer buffer = null;
889 synchronized (queue)
890 {
891 if (flushing || queue.isEmpty())
892 return;
893
894 Set<IStream> stalledStreams = null;
895 for (int i = 0; i < queue.size(); ++i)
896 {
897 frameBytes = queue.get(i);
898
899 IStream stream = frameBytes.getStream();
900 if (stream != null && stalledStreams != null && stalledStreams.contains(stream))
901 continue;
902
903 buffer = frameBytes.getByteBuffer();
904 if (buffer != null)
905 {
906 queue.remove(i);
907 if (stream != null && stream.isReset())
908 {
909 frameBytes.fail(new StreamException(stream.getId(),StreamStatus.INVALID_STREAM));
910 return;
911 }
912 break;
913 }
914
915 if (stalledStreams == null)
916 stalledStreams = new HashSet<>();
917 if (stream != null)
918 stalledStreams.add(stream);
919
920 logger.debug("Flush stalled for {}, {} frame(s) in queue",frameBytes,queue.size());
921 }
922
923 if (buffer == null)
924 return;
925
926 flushing = true;
927 logger.debug("Flushing {}, {} frame(s) in queue",frameBytes,queue.size());
928 }
929 write(buffer,this,frameBytes);
930 }
931
932 private void append(FrameBytes frameBytes)
933 {
934 Throwable failure;
935 synchronized (queue)
936 {
937 failure = this.failure;
938 if (failure == null)
939 {
940 int index = queue.size();
941 while (index > 0)
942 {
943 FrameBytes element = queue.get(index - 1);
944 if (element.compareTo(frameBytes) >= 0)
945 break;
946 --index;
947 }
948 queue.add(index,frameBytes);
949 }
950 }
951
952 if (failure != null)
953 frameBytes.fail(new SPDYException(failure));
954 }
955
956 private void prepend(FrameBytes frameBytes)
957 {
958 Throwable failure;
959 synchronized (queue)
960 {
961 failure = this.failure;
962 if (failure == null)
963 {
964 int index = 0;
965 while (index < queue.size())
966 {
967 FrameBytes element = queue.get(index);
968 if (element.compareTo(frameBytes) <= 0)
969 break;
970 ++index;
971 }
972 queue.add(index,frameBytes);
973 }
974 }
975
976 if (failure != null)
977 frameBytes.fail(new SPDYException(failure));
978 }
979
980 @Override
981 public void completed(FrameBytes frameBytes)
982 {
983 synchronized (queue)
984 {
985 logger.debug("Completed write of {}, {} frame(s) in queue",frameBytes,queue.size());
986 flushing = false;
987 }
988 frameBytes.complete();
989 }
990
991 @Override
992 public void failed(FrameBytes frameBytes, Throwable x)
993 {
994 List<FrameBytes> frameBytesToFail = new ArrayList<>();
995 frameBytesToFail.add(frameBytes);
996
997 synchronized (queue)
998 {
999 failure = x;
1000 String logMessage = String.format("Failed write of %s, failing all %d frame(s) in queue",frameBytes,queue.size());
1001 logger.debug(logMessage,x);
1002 frameBytesToFail.addAll(queue);
1003 queue.clear();
1004 flushing = false;
1005 }
1006
1007 for (FrameBytes fb : frameBytesToFail)
1008 fb.fail(x);
1009 }
1010
1011 protected void write(ByteBuffer buffer, Handler<FrameBytes> handler, FrameBytes frameBytes)
1012 {
1013 if (controller != null)
1014 {
1015 logger.debug("Writing {} frame bytes of {}",buffer.remaining(),frameBytes);
1016 controller.write(buffer,handler,frameBytes);
1017 }
1018 }
1019
1020 private <C> void complete(final Handler<C> handler, final C context)
1021 {
1022
1023
1024
1025
1026 Integer invocations = handlerInvocations.get();
1027 if (invocations >= 4)
1028 {
1029 execute(new Runnable()
1030 {
1031 @Override
1032 public void run()
1033 {
1034 if (handler != null)
1035 notifyHandlerCompleted(handler,context);
1036 flush();
1037 }
1038 });
1039 }
1040 else
1041 {
1042 handlerInvocations.set(invocations + 1);
1043 try
1044 {
1045 if (handler != null)
1046 notifyHandlerCompleted(handler,context);
1047 flush();
1048 }
1049 finally
1050 {
1051 handlerInvocations.set(invocations);
1052 }
1053 }
1054 }
1055
1056 private <C> void notifyHandlerCompleted(Handler<C> handler, C context)
1057 {
1058 try
1059 {
1060 handler.completed(context);
1061 }
1062 catch (Exception x)
1063 {
1064 logger.info("Exception while notifying handler " + handler, x);
1065 }
1066 catch (Error x)
1067 {
1068 logger.info("Exception while notifying handler " + handler, x);
1069 throw x;
1070 }
1071 }
1072
1073 private <C> void notifyHandlerFailed(Handler<C> handler, C context, Throwable x)
1074 {
1075 try
1076 {
1077 if (handler != null)
1078 handler.failed(context, x);
1079 }
1080 catch (Exception xx)
1081 {
1082 logger.info("Exception while notifying handler " + handler, xx);
1083 }
1084 catch (Error xx)
1085 {
1086 logger.info("Exception while notifying handler " + handler, xx);
1087 throw xx;
1088 }
1089 }
1090
1091 public int getWindowSize()
1092 {
1093 return flowControlStrategy.getWindowSize(this);
1094 }
1095
1096 public void setWindowSize(int initialWindowSize)
1097 {
1098 flowControlStrategy.setWindowSize(this, initialWindowSize);
1099 }
1100
1101 public String toString()
1102 {
1103 return String.format("%s@%x{v%d,queuSize=%d,windowSize=%d,streams=%d}", getClass().getSimpleName(), hashCode(), version, queue.size(), getWindowSize(), streams.size());
1104 }
1105
1106
1107 @Override
1108 public String dump()
1109 {
1110 return AggregateLifeCycle.dump(this);
1111 }
1112
1113 @Override
1114 public void dump(Appendable out, String indent) throws IOException
1115 {
1116 AggregateLifeCycle.dumpObject(out,this);
1117 AggregateLifeCycle.dump(out,indent,Collections.singletonList(controller),streams.values());
1118 }
1119
1120
1121
1122 public interface FrameBytes extends Comparable<FrameBytes>
1123 {
1124 public IStream getStream();
1125
1126 public abstract ByteBuffer getByteBuffer();
1127
1128 public abstract void complete();
1129
1130 public abstract void fail(Throwable throwable);
1131 }
1132
1133 private abstract class AbstractFrameBytes<C> implements FrameBytes, Runnable
1134 {
1135 private final IStream stream;
1136 private final Handler<C> handler;
1137 private final C context;
1138 protected volatile ScheduledFuture<?> task;
1139
1140 protected AbstractFrameBytes(IStream stream, Handler<C> handler, C context)
1141 {
1142 this.stream = stream;
1143 this.handler = handler;
1144 this.context = context;
1145 }
1146
1147 @Override
1148 public IStream getStream()
1149 {
1150 return stream;
1151 }
1152
1153 @Override
1154 public int compareTo(FrameBytes that)
1155 {
1156
1157
1158 IStream thisStream = getStream();
1159 IStream thatStream = that.getStream();
1160 if (thisStream == null)
1161 return thatStream == null ? 0 : -1;
1162 if (thatStream == null)
1163 return 1;
1164
1165 return thatStream.getPriority() - thisStream.getPriority();
1166 }
1167
1168 @Override
1169 public void complete()
1170 {
1171 cancelTask();
1172 StandardSession.this.complete(handler,context);
1173 }
1174
1175 @Override
1176 public void fail(Throwable x)
1177 {
1178 cancelTask();
1179 notifyHandlerFailed(handler,context,x);
1180 StandardSession.this.flush();
1181 }
1182
1183 private void cancelTask()
1184 {
1185 ScheduledFuture<?> task = this.task;
1186 if (task != null)
1187 task.cancel(false);
1188 }
1189
1190 @Override
1191 public void run()
1192 {
1193 close();
1194 fail(new InterruptedByTimeoutException());
1195 }
1196 }
1197
1198 private class ControlFrameBytes<C> extends AbstractFrameBytes<C>
1199 {
1200 private final ControlFrame frame;
1201 private final ByteBuffer buffer;
1202
1203 private ControlFrameBytes(IStream stream, Handler<C> handler, C context, ControlFrame frame, ByteBuffer buffer)
1204 {
1205 super(stream,handler,context);
1206 this.frame = frame;
1207 this.buffer = buffer;
1208 }
1209
1210 @Override
1211 public ByteBuffer getByteBuffer()
1212 {
1213 return buffer;
1214 }
1215
1216 @Override
1217 public void complete()
1218 {
1219 bufferPool.release(buffer);
1220
1221 super.complete();
1222
1223 if (frame.getType() == ControlFrameType.GO_AWAY)
1224 {
1225
1226
1227 close();
1228 }
1229 IStream stream = getStream();
1230 if (stream != null && stream.isClosed())
1231 removeStream(stream);
1232 }
1233
1234 @Override
1235 public String toString()
1236 {
1237 return frame.toString();
1238 }
1239 }
1240
1241 private class DataFrameBytes<C> extends AbstractFrameBytes<C>
1242 {
1243 private final DataInfo dataInfo;
1244 private int size;
1245 private volatile ByteBuffer buffer;
1246
1247 private DataFrameBytes(IStream stream, Handler<C> handler, C context, DataInfo dataInfo)
1248 {
1249 super(stream,handler,context);
1250 this.dataInfo = dataInfo;
1251 }
1252
1253 @Override
1254 public ByteBuffer getByteBuffer()
1255 {
1256 try
1257 {
1258 IStream stream = getStream();
1259 int windowSize = stream.getWindowSize();
1260 if (windowSize <= 0)
1261 return null;
1262
1263 size = dataInfo.available();
1264 if (size > windowSize)
1265 size = windowSize;
1266
1267 buffer = generator.data(stream.getId(),size,dataInfo);
1268 return buffer;
1269 }
1270 catch (Throwable x)
1271 {
1272 fail(x);
1273 return null;
1274 }
1275 }
1276
1277 @Override
1278 public void complete()
1279 {
1280 bufferPool.release(buffer);
1281 IStream stream = getStream();
1282 flowControlStrategy.updateWindow(StandardSession.this, stream, -size);
1283 if (dataInfo.available() > 0)
1284 {
1285
1286
1287
1288 prepend(this);
1289 flush();
1290 }
1291 else
1292 {
1293 super.complete();
1294 stream.updateCloseState(dataInfo.isClose(),true);
1295 if (stream.isClosed())
1296 removeStream(stream);
1297 }
1298 }
1299
1300 @Override
1301 public String toString()
1302 {
1303 return String.format("DATA bytes @%x available=%d consumed=%d on %s", dataInfo.hashCode(), dataInfo.available(), dataInfo.consumed(), getStream());
1304 }
1305 }
1306 }