1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.spdy;
20
21 import java.util.Collections;
22 import java.util.Map;
23 import java.util.Set;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.TimeoutException;
27 import java.util.concurrent.atomic.AtomicInteger;
28
29 import org.eclipse.jetty.io.IdleTimeout;
30 import org.eclipse.jetty.spdy.api.DataInfo;
31 import org.eclipse.jetty.spdy.api.HeadersInfo;
32 import org.eclipse.jetty.spdy.api.PushInfo;
33 import org.eclipse.jetty.spdy.api.ReplyInfo;
34 import org.eclipse.jetty.spdy.api.RstInfo;
35 import org.eclipse.jetty.spdy.api.Stream;
36 import org.eclipse.jetty.spdy.api.StreamFrameListener;
37 import org.eclipse.jetty.spdy.api.StreamStatus;
38 import org.eclipse.jetty.spdy.frames.ControlFrame;
39 import org.eclipse.jetty.spdy.frames.HeadersFrame;
40 import org.eclipse.jetty.spdy.frames.SynReplyFrame;
41 import org.eclipse.jetty.util.Callback;
42 import org.eclipse.jetty.util.FutureCallback;
43 import org.eclipse.jetty.util.FuturePromise;
44 import org.eclipse.jetty.util.Promise;
45 import org.eclipse.jetty.util.log.Log;
46 import org.eclipse.jetty.util.log.Logger;
47 import org.eclipse.jetty.util.thread.Scheduler;
48
49 public class StandardStream extends IdleTimeout implements IStream
50 {
51 private static final Logger LOG = Log.getLogger(Stream.class);
52 private final Map<String, Object> attributes = new ConcurrentHashMap<>();
53 private final int id;
54 private final byte priority;
55 private final ISession session;
56 private final IStream associatedStream;
57 private final Promise<Stream> promise;
58 private final AtomicInteger windowSize = new AtomicInteger();
59 private final Set<Stream> pushedStreams = Collections.newSetFromMap(new ConcurrentHashMap<Stream, Boolean>());
60 private volatile StreamFrameListener listener;
61 private volatile OpenState openState = OpenState.SYN_SENT;
62 private volatile CloseState closeState = CloseState.OPENED;
63 private volatile boolean reset = false;
64
65 public StandardStream(int id, byte priority, ISession session, IStream associatedStream, Scheduler scheduler, Promise<Stream> promise)
66 {
67 super(scheduler);
68 this.id = id;
69 this.priority = priority;
70 this.session = session;
71 this.associatedStream = associatedStream;
72 this.promise = promise;
73 }
74
75 @Override
76 public int getId()
77 {
78 return id;
79 }
80
81 @Override
82 public IStream getAssociatedStream()
83 {
84 return associatedStream;
85 }
86
87 @Override
88 public Set<Stream> getPushedStreams()
89 {
90 return pushedStreams;
91 }
92
93 @Override
94 public void associate(IStream stream)
95 {
96 pushedStreams.add(stream);
97 }
98
99 @Override
100 public void disassociate(IStream stream)
101 {
102 pushedStreams.remove(stream);
103 }
104
105 @Override
106 public byte getPriority()
107 {
108 return priority;
109 }
110
111 @Override
112 protected void onIdleExpired(TimeoutException timeout)
113 {
114 StreamFrameListener listener = this.listener;
115 if (listener != null)
116 listener.onFailure(this, timeout);
117 }
118
119 @Override
120 public boolean isOpen()
121 {
122 return !isClosed();
123 }
124
125 @Override
126 public int getWindowSize()
127 {
128 return windowSize.get();
129 }
130
131 @Override
132 public void updateWindowSize(int delta)
133 {
134 int size = windowSize.addAndGet(delta);
135 LOG.debug("Updated window size {} -> {} for {}", size - delta, size, this);
136 }
137
138 @Override
139 public ISession getSession()
140 {
141 return session;
142 }
143
144 @Override
145 public Object getAttribute(String key)
146 {
147 return attributes.get(key);
148 }
149
150 @Override
151 public void setAttribute(String key, Object value)
152 {
153 attributes.put(key, value);
154 }
155
156 @Override
157 public Object removeAttribute(String key)
158 {
159 return attributes.remove(key);
160 }
161
162 @Override
163 public void setStreamFrameListener(StreamFrameListener listener)
164 {
165 this.listener = listener;
166 }
167
168 @Override
169 public StreamFrameListener getStreamFrameListener()
170 {
171 return listener;
172 }
173
174 @Override
175 public void updateCloseState(boolean close, boolean local)
176 {
177 LOG.debug("{} close={} local={}", this, close, local);
178 if (close)
179 {
180 switch (closeState)
181 {
182 case OPENED:
183 {
184 closeState = local ? CloseState.LOCALLY_CLOSED : CloseState.REMOTELY_CLOSED;
185 break;
186 }
187 case LOCALLY_CLOSED:
188 {
189 if (local)
190 throw new IllegalStateException();
191 else
192 closeState = CloseState.CLOSED;
193 break;
194 }
195 case REMOTELY_CLOSED:
196 {
197 if (local)
198 closeState = CloseState.CLOSED;
199 else
200 throw new IllegalStateException();
201 break;
202 }
203 default:
204 {
205 LOG.warn("Already CLOSED! {} local={}", this, local);
206 }
207 }
208 }
209 }
210
211 @Override
212 public void process(ControlFrame frame)
213 {
214 notIdle();
215 switch (frame.getType())
216 {
217 case SYN_STREAM:
218 {
219 openState = OpenState.SYN_RECV;
220 break;
221 }
222 case SYN_REPLY:
223 {
224 openState = OpenState.REPLY_RECV;
225 SynReplyFrame synReply = (SynReplyFrame)frame;
226 updateCloseState(synReply.isClose(), false);
227 ReplyInfo replyInfo = new ReplyInfo(synReply.getHeaders(), synReply.isClose());
228 notifyOnReply(replyInfo);
229 break;
230 }
231 case HEADERS:
232 {
233 HeadersFrame headers = (HeadersFrame)frame;
234 updateCloseState(headers.isClose(), false);
235 HeadersInfo headersInfo = new HeadersInfo(headers.getHeaders(), headers.isClose(), headers.isResetCompression());
236 notifyOnHeaders(headersInfo);
237 break;
238 }
239 case RST_STREAM:
240 {
241 reset = true;
242 break;
243 }
244 default:
245 {
246 throw new IllegalStateException();
247 }
248 }
249 }
250
251 @Override
252 public void process(DataInfo dataInfo)
253 {
254 notIdle();
255
256
257 if (isRemotelyClosed())
258 {
259 LOG.debug("Stream is remotely closed, ignoring {}", dataInfo);
260 return;
261 }
262
263 if (!canReceive())
264 {
265 LOG.debug("Protocol error receiving {}, resetting", dataInfo);
266 session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter());
267 return;
268 }
269
270 updateCloseState(dataInfo.isClose(), false);
271 notifyOnData(dataInfo);
272 }
273
274 @Override
275 public void succeeded()
276 {
277 if (promise != null)
278 promise.succeeded(this);
279 }
280
281 @Override
282 public void failed(Throwable x)
283 {
284 if (promise != null)
285 promise.failed(x);
286 }
287
288 private void notifyOnReply(ReplyInfo replyInfo)
289 {
290 final StreamFrameListener listener = this.listener;
291 try
292 {
293 if (listener != null)
294 {
295 LOG.debug("Invoking reply callback with {} on listener {}", replyInfo, listener);
296 listener.onReply(this, replyInfo);
297 }
298 }
299 catch (Exception x)
300 {
301 LOG.info("Exception while notifying listener " + listener, x);
302 }
303 catch (Error x)
304 {
305 LOG.info("Exception while notifying listener " + listener, x);
306 throw x;
307 }
308 }
309
310 private void notifyOnHeaders(HeadersInfo headersInfo)
311 {
312 final StreamFrameListener listener = this.listener;
313 try
314 {
315 if (listener != null)
316 {
317 LOG.debug("Invoking headers callback with {} on listener {}", headersInfo, listener);
318 listener.onHeaders(this, headersInfo);
319 }
320 }
321 catch (Exception x)
322 {
323 LOG.info("Exception while notifying listener " + listener, x);
324 }
325 catch (Error x)
326 {
327 LOG.info("Exception while notifying listener " + listener, x);
328 throw x;
329 }
330 }
331
332 private void notifyOnData(DataInfo dataInfo)
333 {
334 final StreamFrameListener listener = this.listener;
335 try
336 {
337 if (listener != null)
338 {
339 LOG.debug("Invoking data callback with {} on listener {}", dataInfo, listener);
340 listener.onData(this, dataInfo);
341 LOG.debug("Invoked data callback with {} on listener {}", dataInfo, listener);
342 }
343 }
344 catch (Exception x)
345 {
346 LOG.info("Exception while notifying listener " + listener, x);
347 }
348 catch (Error x)
349 {
350 LOG.info("Exception while notifying listener " + listener, x);
351 throw x;
352 }
353 }
354
355 @Override
356 public Stream push(PushInfo pushInfo) throws InterruptedException, ExecutionException, TimeoutException
357 {
358 FuturePromise<Stream> result = new FuturePromise<>();
359 push(pushInfo, result);
360 if (pushInfo.getTimeout() > 0)
361 return result.get(pushInfo.getTimeout(), pushInfo.getUnit());
362 else
363 return result.get();
364 }
365
366 @Override
367 public void push(PushInfo pushInfo, Promise<Stream> promise)
368 {
369 notIdle();
370 if (isClosed() || isReset())
371 {
372 promise.failed(new StreamException(getId(), StreamStatus.STREAM_ALREADY_CLOSED,
373 "Stream: " + this + " already closed or reset!"));
374 return;
375 }
376 PushSynInfo pushSynInfo = new PushSynInfo(getId(), pushInfo);
377 session.syn(pushSynInfo, null, promise);
378 }
379
380 @Override
381 public void reply(ReplyInfo replyInfo) throws InterruptedException, ExecutionException, TimeoutException
382 {
383 FutureCallback result = new FutureCallback();
384 reply(replyInfo, result);
385 if (replyInfo.getTimeout() > 0)
386 result.get(replyInfo.getTimeout(), replyInfo.getUnit());
387 else
388 result.get();
389 }
390
391 @Override
392 public void reply(ReplyInfo replyInfo, Callback callback)
393 {
394 notIdle();
395 if (isUnidirectional())
396 throw new IllegalStateException("Protocol violation: cannot send SYN_REPLY frames in unidirectional streams");
397 openState = OpenState.REPLY_SENT;
398 updateCloseState(replyInfo.isClose(), true);
399 SynReplyFrame frame = new SynReplyFrame(session.getVersion(), replyInfo.getFlags(), getId(), replyInfo.getHeaders());
400 session.control(this, frame, replyInfo.getTimeout(), replyInfo.getUnit(), callback);
401 }
402
403 @Override
404 public void data(DataInfo dataInfo) throws InterruptedException, ExecutionException, TimeoutException
405 {
406 FutureCallback result = new FutureCallback();
407 data(dataInfo, result);
408 if (dataInfo.getTimeout() > 0)
409 result.get(dataInfo.getTimeout(), dataInfo.getUnit());
410 else
411 result.get();
412 }
413
414 @Override
415 public void data(DataInfo dataInfo, Callback callback)
416 {
417 notIdle();
418 if (!canSend())
419 {
420 session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter());
421 throw new IllegalStateException("Protocol violation: cannot send a DATA frame before a SYN_REPLY frame");
422 }
423 if (isLocallyClosed())
424 {
425 session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter());
426 throw new IllegalStateException("Protocol violation: cannot send a DATA frame on a locally closed stream");
427 }
428
429
430
431 session.data(this, dataInfo, dataInfo.getTimeout(), dataInfo.getUnit(), callback);
432 }
433
434 @Override
435 public void headers(HeadersInfo headersInfo) throws InterruptedException, ExecutionException, TimeoutException
436 {
437 FutureCallback result = new FutureCallback();
438 headers(headersInfo, result);
439 if (headersInfo.getTimeout() > 0)
440 result.get(headersInfo.getTimeout(), headersInfo.getUnit());
441 else
442 result.get();
443 }
444
445 @Override
446 public void headers(HeadersInfo headersInfo, Callback callback)
447 {
448 notIdle();
449 if (!canSend())
450 {
451 session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter());
452 throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame before a SYN_REPLY frame");
453 }
454 if (isLocallyClosed())
455 {
456 session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter());
457 throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame on a closed stream");
458 }
459
460 updateCloseState(headersInfo.isClose(), true);
461 HeadersFrame frame = new HeadersFrame(session.getVersion(), headersInfo.getFlags(), getId(), headersInfo.getHeaders());
462 session.control(this, frame, headersInfo.getTimeout(), headersInfo.getUnit(), callback);
463 }
464
465 @Override
466 public boolean isUnidirectional()
467 {
468 return associatedStream != null;
469 }
470
471 @Override
472 public boolean isReset()
473 {
474 return reset;
475 }
476
477 @Override
478 public boolean isHalfClosed()
479 {
480 CloseState closeState = this.closeState;
481 return closeState == CloseState.LOCALLY_CLOSED || closeState == CloseState.REMOTELY_CLOSED || closeState == CloseState.CLOSED;
482 }
483
484 @Override
485 public boolean isClosed()
486 {
487 return closeState == CloseState.CLOSED;
488 }
489
490 private boolean isLocallyClosed()
491 {
492 CloseState closeState = this.closeState;
493 return closeState == CloseState.LOCALLY_CLOSED || closeState == CloseState.CLOSED;
494 }
495
496 private boolean isRemotelyClosed()
497 {
498 CloseState closeState = this.closeState;
499 return closeState == CloseState.REMOTELY_CLOSED || closeState == CloseState.CLOSED;
500 }
501
502 @Override
503 public String toString()
504 {
505 return String.format("stream=%d v%d windowSize=%d reset=%s prio=%d %s %s", getId(), session.getVersion(),
506 getWindowSize(), isReset(), priority, openState, closeState);
507 }
508
509 private boolean canSend()
510 {
511 OpenState openState = this.openState;
512 return openState == OpenState.SYN_SENT || openState == OpenState.REPLY_RECV || openState == OpenState.REPLY_SENT;
513 }
514
515 private boolean canReceive()
516 {
517 OpenState openState = this.openState;
518 return openState == OpenState.SYN_RECV || openState == OpenState.REPLY_RECV || openState == OpenState.REPLY_SENT;
519 }
520
521 private enum OpenState
522 {
523 SYN_SENT, SYN_RECV, REPLY_SENT, REPLY_RECV
524 }
525
526 private enum CloseState
527 {
528 OPENED, LOCALLY_CLOSED, REMOTELY_CLOSED, CLOSED
529 }
530 }