1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.spdy;
20
21 import java.util.Collections;
22 import java.util.Map;
23 import java.util.Set;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.TimeoutException;
27 import java.util.concurrent.atomic.AtomicInteger;
28
29 import org.eclipse.jetty.spdy.api.DataInfo;
30 import org.eclipse.jetty.spdy.api.HeadersInfo;
31 import org.eclipse.jetty.spdy.api.PushInfo;
32 import org.eclipse.jetty.spdy.api.ReplyInfo;
33 import org.eclipse.jetty.spdy.api.RstInfo;
34 import org.eclipse.jetty.spdy.api.Stream;
35 import org.eclipse.jetty.spdy.api.StreamFrameListener;
36 import org.eclipse.jetty.spdy.api.StreamStatus;
37 import org.eclipse.jetty.spdy.frames.ControlFrame;
38 import org.eclipse.jetty.spdy.frames.HeadersFrame;
39 import org.eclipse.jetty.spdy.frames.SynReplyFrame;
40 import org.eclipse.jetty.util.Callback;
41 import org.eclipse.jetty.util.FutureCallback;
42 import org.eclipse.jetty.util.FuturePromise;
43 import org.eclipse.jetty.util.Promise;
44 import org.eclipse.jetty.util.log.Log;
45 import org.eclipse.jetty.util.log.Logger;
46
47 public class StandardStream implements IStream
48 {
49 private static final Logger LOG = Log.getLogger(Stream.class);
50 private final Map<String, Object> attributes = new ConcurrentHashMap<>();
51 private final int id;
52 private final byte priority;
53 private final ISession session;
54 private final IStream associatedStream;
55 private final Promise<Stream> promise;
56 private final AtomicInteger windowSize = new AtomicInteger();
57 private final Set<Stream> pushedStreams = Collections.newSetFromMap(new ConcurrentHashMap<Stream, Boolean>());
58 private volatile StreamFrameListener listener;
59 private volatile OpenState openState = OpenState.SYN_SENT;
60 private volatile CloseState closeState = CloseState.OPENED;
61 private volatile boolean reset = false;
62
63 public StandardStream(int id, byte priority, ISession session, IStream associatedStream, Promise<Stream> promise)
64 {
65 this.id = id;
66 this.priority = priority;
67 this.session = session;
68 this.associatedStream = associatedStream;
69 this.promise = promise;
70 }
71
72 @Override
73 public int getId()
74 {
75 return id;
76 }
77
78 @Override
79 public IStream getAssociatedStream()
80 {
81 return associatedStream;
82 }
83
84 @Override
85 public Set<Stream> getPushedStreams()
86 {
87 return pushedStreams;
88 }
89
90 @Override
91 public void associate(IStream stream)
92 {
93 pushedStreams.add(stream);
94 }
95
96 @Override
97 public void disassociate(IStream stream)
98 {
99 pushedStreams.remove(stream);
100 }
101
102 @Override
103 public byte getPriority()
104 {
105 return priority;
106 }
107
108 @Override
109 public int getWindowSize()
110 {
111 return windowSize.get();
112 }
113
114 @Override
115 public void updateWindowSize(int delta)
116 {
117 int size = windowSize.addAndGet(delta);
118 LOG.debug("Updated window size {} -> {} for {}", size - delta, size, this);
119 }
120
121 @Override
122 public ISession getSession()
123 {
124 return session;
125 }
126
127 @Override
128 public Object getAttribute(String key)
129 {
130 return attributes.get(key);
131 }
132
133 @Override
134 public void setAttribute(String key, Object value)
135 {
136 attributes.put(key, value);
137 }
138
139 @Override
140 public Object removeAttribute(String key)
141 {
142 return attributes.remove(key);
143 }
144
145 @Override
146 public void setStreamFrameListener(StreamFrameListener listener)
147 {
148 this.listener = listener;
149 }
150
151 @Override
152 public StreamFrameListener getStreamFrameListener()
153 {
154 return listener;
155 }
156
157 @Override
158 public void updateCloseState(boolean close, boolean local)
159 {
160 LOG.debug("{} close={} local={}", this, close, local);
161 if (close)
162 {
163 switch (closeState)
164 {
165 case OPENED:
166 {
167 closeState = local ? CloseState.LOCALLY_CLOSED : CloseState.REMOTELY_CLOSED;
168 break;
169 }
170 case LOCALLY_CLOSED:
171 {
172 if (local)
173 throw new IllegalStateException();
174 else
175 closeState = CloseState.CLOSED;
176 break;
177 }
178 case REMOTELY_CLOSED:
179 {
180 if (local)
181 closeState = CloseState.CLOSED;
182 else
183 throw new IllegalStateException();
184 break;
185 }
186 default:
187 {
188 LOG.warn("Already CLOSED! {} local={}", this, local);
189 }
190 }
191 }
192 }
193
194 @Override
195 public void process(ControlFrame frame)
196 {
197 switch (frame.getType())
198 {
199 case SYN_STREAM:
200 {
201 openState = OpenState.SYN_RECV;
202 break;
203 }
204 case SYN_REPLY:
205 {
206 openState = OpenState.REPLY_RECV;
207 SynReplyFrame synReply = (SynReplyFrame)frame;
208 updateCloseState(synReply.isClose(), false);
209 ReplyInfo replyInfo = new ReplyInfo(synReply.getHeaders(), synReply.isClose());
210 notifyOnReply(replyInfo);
211 break;
212 }
213 case HEADERS:
214 {
215 HeadersFrame headers = (HeadersFrame)frame;
216 updateCloseState(headers.isClose(), false);
217 HeadersInfo headersInfo = new HeadersInfo(headers.getHeaders(), headers.isClose(), headers.isResetCompression());
218 notifyOnHeaders(headersInfo);
219 break;
220 }
221 case RST_STREAM:
222 {
223 reset = true;
224 break;
225 }
226 default:
227 {
228 throw new IllegalStateException();
229 }
230 }
231 session.flush();
232 }
233
234 @Override
235 public void process(DataInfo dataInfo)
236 {
237
238
239 if (isRemotelyClosed())
240 {
241 LOG.debug("Stream is remotely closed, ignoring {}", dataInfo);
242 return;
243 }
244
245 if (!canReceive())
246 {
247 LOG.debug("Protocol error receiving {}, resetting", dataInfo);
248 session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter());
249 return;
250 }
251
252 updateCloseState(dataInfo.isClose(), false);
253 notifyOnData(dataInfo);
254 session.flush();
255 }
256
257 @Override
258 public void succeeded()
259 {
260 if (promise != null)
261 promise.succeeded(this);
262 }
263
264 @Override
265 public void failed(Throwable x)
266 {
267 if (promise != null)
268 promise.failed(x);
269 }
270
271 private void notifyOnReply(ReplyInfo replyInfo)
272 {
273 final StreamFrameListener listener = this.listener;
274 try
275 {
276 if (listener != null)
277 {
278 LOG.debug("Invoking reply callback with {} on listener {}", replyInfo, listener);
279 listener.onReply(this, replyInfo);
280 }
281 }
282 catch (Exception x)
283 {
284 LOG.info("Exception while notifying listener " + listener, x);
285 }
286 catch (Error x)
287 {
288 LOG.info("Exception while notifying listener " + listener, x);
289 throw x;
290 }
291 }
292
293 private void notifyOnHeaders(HeadersInfo headersInfo)
294 {
295 final StreamFrameListener listener = this.listener;
296 try
297 {
298 if (listener != null)
299 {
300 LOG.debug("Invoking headers callback with {} on listener {}", headersInfo, listener);
301 listener.onHeaders(this, headersInfo);
302 }
303 }
304 catch (Exception x)
305 {
306 LOG.info("Exception while notifying listener " + listener, x);
307 }
308 catch (Error x)
309 {
310 LOG.info("Exception while notifying listener " + listener, x);
311 throw x;
312 }
313 }
314
315 private void notifyOnData(DataInfo dataInfo)
316 {
317 final StreamFrameListener listener = this.listener;
318 try
319 {
320 if (listener != null)
321 {
322 LOG.debug("Invoking data callback with {} on listener {}", dataInfo, listener);
323 listener.onData(this, dataInfo);
324 LOG.debug("Invoked data callback with {} on listener {}", dataInfo, listener);
325 }
326 }
327 catch (Exception x)
328 {
329 LOG.info("Exception while notifying listener " + listener, x);
330 }
331 catch (Error x)
332 {
333 LOG.info("Exception while notifying listener " + listener, x);
334 throw x;
335 }
336 }
337
338 @Override
339 public Stream push(PushInfo pushInfo) throws InterruptedException, ExecutionException, TimeoutException
340 {
341 FuturePromise<Stream> result = new FuturePromise<>();
342 push(pushInfo, result);
343 if (pushInfo.getTimeout() > 0)
344 return result.get(pushInfo.getTimeout(), pushInfo.getUnit());
345 else
346 return result.get();
347 }
348
349 @Override
350 public void push(PushInfo pushInfo, Promise<Stream> promise)
351 {
352 if (isClosed() || isReset())
353 {
354 promise.failed(new StreamException(getId(), StreamStatus.STREAM_ALREADY_CLOSED,
355 "Stream: " + this + " already closed or reset!"));
356 return;
357 }
358 PushSynInfo pushSynInfo = new PushSynInfo(getId(), pushInfo);
359 session.syn(pushSynInfo, null, promise);
360 }
361
362 @Override
363 public void reply(ReplyInfo replyInfo) throws InterruptedException, ExecutionException, TimeoutException
364 {
365 FutureCallback result = new FutureCallback();
366 reply(replyInfo, result);
367 if (replyInfo.getTimeout() > 0)
368 result.get(replyInfo.getTimeout(), replyInfo.getUnit());
369 else
370 result.get();
371 }
372
373 @Override
374 public void reply(ReplyInfo replyInfo, Callback callback)
375 {
376 if (isUnidirectional())
377 throw new IllegalStateException("Protocol violation: cannot send SYN_REPLY frames in unidirectional streams");
378 openState = OpenState.REPLY_SENT;
379 updateCloseState(replyInfo.isClose(), true);
380 SynReplyFrame frame = new SynReplyFrame(session.getVersion(), replyInfo.getFlags(), getId(), replyInfo.getHeaders());
381 session.control(this, frame, replyInfo.getTimeout(), replyInfo.getUnit(), callback);
382 }
383
384 @Override
385 public void data(DataInfo dataInfo) throws InterruptedException, ExecutionException, TimeoutException
386 {
387 FutureCallback result = new FutureCallback();
388 data(dataInfo, result);
389 if (dataInfo.getTimeout() > 0)
390 result.get(dataInfo.getTimeout(), dataInfo.getUnit());
391 else
392 result.get();
393 }
394
395 @Override
396 public void data(DataInfo dataInfo, Callback callback)
397 {
398 if (!canSend())
399 {
400 session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter());
401 throw new IllegalStateException("Protocol violation: cannot send a DATA frame before a SYN_REPLY frame");
402 }
403 if (isLocallyClosed())
404 {
405 session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter());
406 throw new IllegalStateException("Protocol violation: cannot send a DATA frame on a locally closed stream");
407 }
408
409
410
411 session.data(this, dataInfo, dataInfo.getTimeout(), dataInfo.getUnit(), callback);
412 }
413
414 @Override
415 public void headers(HeadersInfo headersInfo) throws InterruptedException, ExecutionException, TimeoutException
416 {
417 FutureCallback result = new FutureCallback();
418 headers(headersInfo, result);
419 if (headersInfo.getTimeout() > 0)
420 result.get(headersInfo.getTimeout(), headersInfo.getUnit());
421 else
422 result.get();
423 }
424
425 @Override
426 public void headers(HeadersInfo headersInfo, Callback callback)
427 {
428 if (!canSend())
429 {
430 session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter());
431 throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame before a SYN_REPLY frame");
432 }
433 if (isLocallyClosed())
434 {
435 session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter());
436 throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame on a closed stream");
437 }
438
439 updateCloseState(headersInfo.isClose(), true);
440 HeadersFrame frame = new HeadersFrame(session.getVersion(), headersInfo.getFlags(), getId(), headersInfo.getHeaders());
441 session.control(this, frame, headersInfo.getTimeout(), headersInfo.getUnit(), callback);
442 }
443
444 @Override
445 public boolean isUnidirectional()
446 {
447 return associatedStream != null;
448 }
449
450 @Override
451 public boolean isReset()
452 {
453 return reset;
454 }
455
456 @Override
457 public boolean isHalfClosed()
458 {
459 CloseState closeState = this.closeState;
460 return closeState == CloseState.LOCALLY_CLOSED || closeState == CloseState.REMOTELY_CLOSED || closeState == CloseState.CLOSED;
461 }
462
463 @Override
464 public boolean isClosed()
465 {
466 return closeState == CloseState.CLOSED;
467 }
468
469 private boolean isLocallyClosed()
470 {
471 CloseState closeState = this.closeState;
472 return closeState == CloseState.LOCALLY_CLOSED || closeState == CloseState.CLOSED;
473 }
474
475 private boolean isRemotelyClosed()
476 {
477 CloseState closeState = this.closeState;
478 return closeState == CloseState.REMOTELY_CLOSED || closeState == CloseState.CLOSED;
479 }
480
481 @Override
482 public String toString()
483 {
484 return String.format("stream=%d v%d windowSize=%d reset=%s prio=%d %s %s", getId(), session.getVersion(),
485 getWindowSize(), isReset(), priority, openState, closeState);
486 }
487
488 private boolean canSend()
489 {
490 OpenState openState = this.openState;
491 return openState == OpenState.SYN_SENT || openState == OpenState.REPLY_RECV || openState == OpenState.REPLY_SENT;
492 }
493
494 private boolean canReceive()
495 {
496 OpenState openState = this.openState;
497 return openState == OpenState.SYN_RECV || openState == OpenState.REPLY_RECV || openState == OpenState.REPLY_SENT;
498 }
499
500 private enum OpenState
501 {
502 SYN_SENT, SYN_RECV, REPLY_SENT, REPLY_RECV
503 }
504
505 private enum CloseState
506 {
507 OPENED, LOCALLY_CLOSED, REMOTELY_CLOSED, CLOSED
508 }
509 }