1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.spdy;
20
21 import java.util.Collections;
22 import java.util.Map;
23 import java.util.Set;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.TimeoutException;
27 import java.util.concurrent.atomic.AtomicInteger;
28
29 import org.eclipse.jetty.io.IdleTimeout;
30 import org.eclipse.jetty.spdy.api.DataInfo;
31 import org.eclipse.jetty.spdy.api.HeadersInfo;
32 import org.eclipse.jetty.spdy.api.PushInfo;
33 import org.eclipse.jetty.spdy.api.ReplyInfo;
34 import org.eclipse.jetty.spdy.api.RstInfo;
35 import org.eclipse.jetty.spdy.api.Stream;
36 import org.eclipse.jetty.spdy.api.StreamFrameListener;
37 import org.eclipse.jetty.spdy.api.StreamStatus;
38 import org.eclipse.jetty.spdy.frames.ControlFrame;
39 import org.eclipse.jetty.spdy.frames.HeadersFrame;
40 import org.eclipse.jetty.spdy.frames.SynReplyFrame;
41 import org.eclipse.jetty.util.Callback;
42 import org.eclipse.jetty.util.FutureCallback;
43 import org.eclipse.jetty.util.FuturePromise;
44 import org.eclipse.jetty.util.Promise;
45 import org.eclipse.jetty.util.log.Log;
46 import org.eclipse.jetty.util.log.Logger;
47 import org.eclipse.jetty.util.thread.Scheduler;
48
49 public class StandardStream extends IdleTimeout implements IStream
50 {
51 private static final Logger LOG = Log.getLogger(Stream.class);
52 private final Map<String, Object> attributes = new ConcurrentHashMap<>();
53 private final int id;
54 private final byte priority;
55 private final ISession session;
56 private final IStream associatedStream;
57 private final Promise<Stream> promise;
58 private final AtomicInteger windowSize = new AtomicInteger();
59 private final Set<Stream> pushedStreams = Collections.newSetFromMap(new ConcurrentHashMap<Stream, Boolean>());
60 private volatile StreamFrameListener listener;
61 private volatile OpenState openState = OpenState.SYN_SENT;
62 private volatile CloseState closeState = CloseState.OPENED;
63 private volatile boolean reset = false;
64
65 public StandardStream(int id, byte priority, ISession session, IStream associatedStream, Scheduler scheduler, Promise<Stream> promise)
66 {
67 super(scheduler);
68 this.id = id;
69 this.priority = priority;
70 this.session = session;
71 this.associatedStream = associatedStream;
72 this.promise = promise;
73 }
74
75 @Override
76 public int getId()
77 {
78 return id;
79 }
80
81 @Override
82 public IStream getAssociatedStream()
83 {
84 return associatedStream;
85 }
86
87 @Override
88 public Set<Stream> getPushedStreams()
89 {
90 return pushedStreams;
91 }
92
93 @Override
94 public void associate(IStream stream)
95 {
96 pushedStreams.add(stream);
97 }
98
99 @Override
100 public void disassociate(IStream stream)
101 {
102 pushedStreams.remove(stream);
103 }
104
105 @Override
106 public byte getPriority()
107 {
108 return priority;
109 }
110
111 @Override
112 protected void onIdleExpired(TimeoutException timeout)
113 {
114 StreamFrameListener listener = this.listener;
115 if (listener != null)
116 listener.onFailure(this, timeout);
117
118
119 close();
120 }
121
122 private void close()
123 {
124 closeState = CloseState.CLOSED;
125 onClose();
126 }
127
128 @Override
129 public boolean isOpen()
130 {
131 return !isClosed();
132 }
133
134 @Override
135 public int getWindowSize()
136 {
137 return windowSize.get();
138 }
139
140 @Override
141 public void updateWindowSize(int delta)
142 {
143 int size = windowSize.addAndGet(delta);
144 if (LOG.isDebugEnabled())
145 LOG.debug("Updated window size {} -> {} for {}", size - delta, size, this);
146 }
147
148 @Override
149 public ISession getSession()
150 {
151 return session;
152 }
153
154 @Override
155 public Object getAttribute(String key)
156 {
157 return attributes.get(key);
158 }
159
160 @Override
161 public void setAttribute(String key, Object value)
162 {
163 attributes.put(key, value);
164 }
165
166 @Override
167 public Object removeAttribute(String key)
168 {
169 return attributes.remove(key);
170 }
171
172 @Override
173 public void setStreamFrameListener(StreamFrameListener listener)
174 {
175 this.listener = listener;
176 }
177
178 @Override
179 public StreamFrameListener getStreamFrameListener()
180 {
181 return listener;
182 }
183
184 @Override
185 public void updateCloseState(boolean close, boolean local)
186 {
187 if (LOG.isDebugEnabled())
188 LOG.debug("{} close={} local={}", this, close, local);
189 if (close)
190 {
191 switch (closeState)
192 {
193 case OPENED:
194 {
195 closeState = local ? CloseState.LOCALLY_CLOSED : CloseState.REMOTELY_CLOSED;
196 break;
197 }
198 case LOCALLY_CLOSED:
199 {
200 if (local)
201 throw new IllegalStateException();
202 else
203 close();
204 break;
205 }
206 case REMOTELY_CLOSED:
207 {
208 if (local)
209 close();
210 else
211 throw new IllegalStateException();
212 break;
213 }
214 default:
215 {
216 LOG.warn("Already CLOSED! {} local={}", this, local);
217 }
218 }
219 }
220 }
221
222 @Override
223 public void process(ControlFrame frame)
224 {
225 notIdle();
226 switch (frame.getType())
227 {
228 case SYN_STREAM:
229 {
230 openState = OpenState.SYN_RECV;
231 break;
232 }
233 case SYN_REPLY:
234 {
235 openState = OpenState.REPLY_RECV;
236 SynReplyFrame synReply = (SynReplyFrame)frame;
237 updateCloseState(synReply.isClose(), false);
238 ReplyInfo replyInfo = new ReplyInfo(synReply.getHeaders(), synReply.isClose());
239 notifyOnReply(replyInfo);
240 break;
241 }
242 case HEADERS:
243 {
244 HeadersFrame headers = (HeadersFrame)frame;
245 updateCloseState(headers.isClose(), false);
246 HeadersInfo headersInfo = new HeadersInfo(headers.getHeaders(), headers.isClose(), headers.isResetCompression());
247 notifyOnHeaders(headersInfo);
248 break;
249 }
250 case RST_STREAM:
251 {
252 reset = true;
253 break;
254 }
255 default:
256 {
257 throw new IllegalStateException();
258 }
259 }
260 }
261
262 @Override
263 public void process(DataInfo dataInfo)
264 {
265 notIdle();
266
267
268 if (isRemotelyClosed())
269 {
270 if (LOG.isDebugEnabled())
271 LOG.debug("Stream is remotely closed, ignoring {}", dataInfo);
272 return;
273 }
274
275 if (!canReceive())
276 {
277 if (LOG.isDebugEnabled())
278 LOG.debug("Protocol error receiving {}, resetting", dataInfo);
279 session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), Callback.Adapter.INSTANCE);
280 return;
281 }
282
283 updateCloseState(dataInfo.isClose(), false);
284 notifyOnData(dataInfo);
285 }
286
287 @Override
288 public void succeeded()
289 {
290 if (promise != null)
291 promise.succeeded(this);
292 }
293
294 @Override
295 public void failed(Throwable x)
296 {
297 if (promise != null)
298 promise.failed(x);
299 }
300
301 private void notifyOnReply(ReplyInfo replyInfo)
302 {
303 final StreamFrameListener listener = this.listener;
304 try
305 {
306 if (listener != null)
307 {
308 if (LOG.isDebugEnabled())
309 LOG.debug("Invoking reply callback with {} on listener {}", replyInfo, listener);
310 listener.onReply(this, replyInfo);
311 }
312 }
313 catch (Exception x)
314 {
315 LOG.info("Exception while notifying listener " + listener, x);
316 }
317 catch (Error x)
318 {
319 LOG.info("Exception while notifying listener " + listener, x);
320 throw x;
321 }
322 }
323
324 private void notifyOnHeaders(HeadersInfo headersInfo)
325 {
326 final StreamFrameListener listener = this.listener;
327 try
328 {
329 if (listener != null)
330 {
331 if (LOG.isDebugEnabled())
332 LOG.debug("Invoking headers callback with {} on listener {}", headersInfo, listener);
333 listener.onHeaders(this, headersInfo);
334 }
335 }
336 catch (Exception x)
337 {
338 LOG.info("Exception while notifying listener " + listener, x);
339 }
340 catch (Error x)
341 {
342 LOG.info("Exception while notifying listener " + listener, x);
343 throw x;
344 }
345 }
346
347 private void notifyOnData(DataInfo dataInfo)
348 {
349 final StreamFrameListener listener = this.listener;
350 try
351 {
352 if (listener != null)
353 {
354 if (LOG.isDebugEnabled())
355 LOG.debug("Invoking data callback with {} on listener {}", dataInfo, listener);
356 listener.onData(this, dataInfo);
357 if (LOG.isDebugEnabled())
358 LOG.debug("Invoked data callback with {} on listener {}", dataInfo, listener);
359 }
360 }
361 catch (Exception x)
362 {
363 LOG.info("Exception while notifying listener " + listener, x);
364 }
365 catch (Error x)
366 {
367 LOG.info("Exception while notifying listener " + listener, x);
368 throw x;
369 }
370 }
371
372 @Override
373 public Stream push(PushInfo pushInfo) throws InterruptedException, ExecutionException, TimeoutException
374 {
375 FuturePromise<Stream> result = new FuturePromise<>();
376 push(pushInfo, result);
377 if (pushInfo.getTimeout() > 0)
378 return result.get(pushInfo.getTimeout(), pushInfo.getUnit());
379 else
380 return result.get();
381 }
382
383 @Override
384 public void push(PushInfo pushInfo, Promise<Stream> promise)
385 {
386 notIdle();
387 if (isClosed() || isReset())
388 {
389 close();
390 promise.failed(new StreamException(getId(), StreamStatus.STREAM_ALREADY_CLOSED,
391 "Stream: " + this + " already closed or reset!"));
392 return;
393 }
394 PushSynInfo pushSynInfo = new PushSynInfo(getId(), pushInfo);
395 session.syn(pushSynInfo, null, new StreamPromise(promise));
396 }
397
398 @Override
399 public void reply(ReplyInfo replyInfo) throws InterruptedException, ExecutionException, TimeoutException
400 {
401 FutureCallback result = new FutureCallback();
402 reply(replyInfo, result);
403 if (replyInfo.getTimeout() > 0)
404 result.get(replyInfo.getTimeout(), replyInfo.getUnit());
405 else
406 result.get();
407 }
408
409 @Override
410 public void reply(ReplyInfo replyInfo, Callback callback)
411 {
412 notIdle();
413 if (isUnidirectional())
414 {
415 close();
416 throw new IllegalStateException("Protocol violation: cannot send SYN_REPLY frames in unidirectional streams");
417 }
418 openState = OpenState.REPLY_SENT;
419 updateCloseState(replyInfo.isClose(), true);
420 SynReplyFrame frame = new SynReplyFrame(session.getVersion(), replyInfo.getFlags(), getId(), replyInfo.getHeaders());
421 session.control(this, frame, replyInfo.getTimeout(), replyInfo.getUnit(), new StreamCallback(callback));
422 }
423
424 @Override
425 public void data(DataInfo dataInfo) throws InterruptedException, ExecutionException, TimeoutException
426 {
427 FutureCallback result = new FutureCallback();
428 data(dataInfo, result);
429 if (dataInfo.getTimeout() > 0)
430 result.get(dataInfo.getTimeout(), dataInfo.getUnit());
431 else
432 result.get();
433 }
434
435 @Override
436 public void data(DataInfo dataInfo, Callback callback)
437 {
438 notIdle();
439 if (!canSend())
440 {
441 session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new StreamCallback());
442 throw new IllegalStateException("Protocol violation: cannot send a DATA frame before a SYN_REPLY frame");
443 }
444 if (isLocallyClosed())
445 {
446 session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new StreamCallback());
447 throw new IllegalStateException("Protocol violation: cannot send a DATA frame on a locally closed stream");
448 }
449
450
451
452 session.data(this, dataInfo, dataInfo.getTimeout(), dataInfo.getUnit(), new StreamCallback(callback));
453 }
454
455 @Override
456 public void headers(HeadersInfo headersInfo) throws InterruptedException, ExecutionException, TimeoutException
457 {
458 FutureCallback result = new FutureCallback();
459 headers(headersInfo, result);
460 if (headersInfo.getTimeout() > 0)
461 result.get(headersInfo.getTimeout(), headersInfo.getUnit());
462 else
463 result.get();
464 }
465
466 @Override
467 public void headers(HeadersInfo headersInfo, Callback callback)
468 {
469 notIdle();
470 if (!canSend())
471 {
472 session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new StreamCallback());
473 throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame before a SYN_REPLY frame");
474 }
475 if (isLocallyClosed())
476 {
477 session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new StreamCallback());
478 throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame on a closed stream");
479 }
480
481 updateCloseState(headersInfo.isClose(), true);
482 HeadersFrame frame = new HeadersFrame(session.getVersion(), headersInfo.getFlags(), getId(), headersInfo.getHeaders());
483 session.control(this, frame, headersInfo.getTimeout(), headersInfo.getUnit(), new StreamCallback(callback));
484 }
485
486 @Override
487 public boolean isUnidirectional()
488 {
489 return associatedStream != null;
490 }
491
492 @Override
493 public boolean isReset()
494 {
495 return reset;
496 }
497
498 @Override
499 public boolean isHalfClosed()
500 {
501 CloseState closeState = this.closeState;
502 return closeState == CloseState.LOCALLY_CLOSED || closeState == CloseState.REMOTELY_CLOSED || closeState == CloseState.CLOSED;
503 }
504
505 @Override
506 public boolean isClosed()
507 {
508 return closeState == CloseState.CLOSED;
509 }
510
511 private boolean isLocallyClosed()
512 {
513 CloseState closeState = this.closeState;
514 return closeState == CloseState.LOCALLY_CLOSED || closeState == CloseState.CLOSED;
515 }
516
517 private boolean isRemotelyClosed()
518 {
519 CloseState closeState = this.closeState;
520 return closeState == CloseState.REMOTELY_CLOSED || closeState == CloseState.CLOSED;
521 }
522
523 @Override
524 public String toString()
525 {
526 return String.format("stream=%d v%d windowSize=%d reset=%s prio=%d %s %s", getId(), session.getVersion(),
527 getWindowSize(), isReset(), priority, openState, closeState);
528 }
529
530 private boolean canSend()
531 {
532 OpenState openState = this.openState;
533 return openState == OpenState.SYN_SENT || openState == OpenState.REPLY_RECV || openState == OpenState.REPLY_SENT;
534 }
535
536 private boolean canReceive()
537 {
538 OpenState openState = this.openState;
539 return openState == OpenState.SYN_RECV || openState == OpenState.REPLY_RECV || openState == OpenState.REPLY_SENT;
540 }
541
542 private enum OpenState
543 {
544 SYN_SENT, SYN_RECV, REPLY_SENT, REPLY_RECV
545 }
546
547 private enum CloseState
548 {
549 OPENED, LOCALLY_CLOSED, REMOTELY_CLOSED, CLOSED
550 }
551
552 private class StreamCallback implements Callback
553 {
554 private final Callback callback;
555
556 private StreamCallback()
557 {
558 this(Callback.Adapter.INSTANCE);
559 }
560
561 private StreamCallback(Callback callback)
562 {
563 this.callback = callback;
564 }
565
566 @Override
567 public void succeeded()
568 {
569 callback.succeeded();
570 }
571
572 @Override
573 public void failed(Throwable x)
574 {
575 close();
576 callback.failed(x);
577 }
578 }
579
580 private class StreamPromise implements Promise<Stream>
581 {
582 private final Promise<Stream> promise;
583
584 public StreamPromise(Promise<Stream> promise)
585 {
586 this.promise = promise;
587 }
588
589 @Override
590 public void succeeded(Stream result)
591 {
592 promise.succeeded(result);
593 }
594
595 @Override
596 public void failed(Throwable x)
597 {
598 close();
599 promise.failed(x);
600 }
601 }
602 }