1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.spdy;
20
21 import java.util.Collections;
22 import java.util.Map;
23 import java.util.Set;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.Future;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicInteger;
28
29 import org.eclipse.jetty.spdy.api.DataInfo;
30 import org.eclipse.jetty.spdy.api.Handler;
31 import org.eclipse.jetty.spdy.api.HeadersInfo;
32 import org.eclipse.jetty.spdy.api.ReplyInfo;
33 import org.eclipse.jetty.spdy.api.RstInfo;
34 import org.eclipse.jetty.spdy.api.Stream;
35 import org.eclipse.jetty.spdy.api.StreamFrameListener;
36 import org.eclipse.jetty.spdy.api.StreamStatus;
37 import org.eclipse.jetty.spdy.api.SynInfo;
38 import org.eclipse.jetty.spdy.frames.ControlFrame;
39 import org.eclipse.jetty.spdy.frames.HeadersFrame;
40 import org.eclipse.jetty.spdy.frames.SynReplyFrame;
41 import org.eclipse.jetty.util.log.Log;
42 import org.eclipse.jetty.util.log.Logger;
43
44 public class StandardStream implements IStream
45 {
46 private static final Logger logger = Log.getLogger(Stream.class);
47 private final Map<String, Object> attributes = new ConcurrentHashMap<>();
48 private final int id;
49 private final byte priority;
50 private final ISession session;
51 private final IStream associatedStream;
52 private final AtomicInteger windowSize = new AtomicInteger();
53 private final Set<Stream> pushedStreams = Collections.newSetFromMap(new ConcurrentHashMap<Stream, Boolean>());
54 private volatile StreamFrameListener listener;
55 private volatile OpenState openState = OpenState.SYN_SENT;
56 private volatile CloseState closeState = CloseState.OPENED;
57 private volatile boolean reset = false;
58
59 public StandardStream(int id, byte priority, ISession session, IStream associatedStream)
60 {
61 this.id = id;
62 this.priority = priority;
63 this.session = session;
64 this.associatedStream = associatedStream;
65 }
66
67 @Override
68 public int getId()
69 {
70 return id;
71 }
72
73 @Override
74 public IStream getAssociatedStream()
75 {
76 return associatedStream;
77 }
78
79 @Override
80 public Set<Stream> getPushedStreams()
81 {
82 return pushedStreams;
83 }
84
85 @Override
86 public void associate(IStream stream)
87 {
88 pushedStreams.add(stream);
89 }
90
91 @Override
92 public void disassociate(IStream stream)
93 {
94 pushedStreams.remove(stream);
95 }
96
97 @Override
98 public byte getPriority()
99 {
100 return priority;
101 }
102
103 @Override
104 public int getWindowSize()
105 {
106 return windowSize.get();
107 }
108
109 @Override
110 public void updateWindowSize(int delta)
111 {
112 int size = windowSize.addAndGet(delta);
113 logger.debug("Updated window size {} -> {} for {}", size - delta, size, this);
114 }
115
116 @Override
117 public ISession getSession()
118 {
119 return session;
120 }
121
122 @Override
123 public Object getAttribute(String key)
124 {
125 return attributes.get(key);
126 }
127
128 @Override
129 public void setAttribute(String key, Object value)
130 {
131 attributes.put(key,value);
132 }
133
134 @Override
135 public Object removeAttribute(String key)
136 {
137 return attributes.remove(key);
138 }
139
140 @Override
141 public void setStreamFrameListener(StreamFrameListener listener)
142 {
143 this.listener = listener;
144 }
145
146 public StreamFrameListener getStreamFrameListener()
147 {
148 return listener;
149 }
150
151 @Override
152 public void updateCloseState(boolean close, boolean local)
153 {
154 if (close)
155 {
156 switch (closeState)
157 {
158 case OPENED:
159 {
160 closeState = local ? CloseState.LOCALLY_CLOSED : CloseState.REMOTELY_CLOSED;
161 break;
162 }
163 case LOCALLY_CLOSED:
164 {
165 if (local)
166 throw new IllegalStateException();
167 else
168 closeState = CloseState.CLOSED;
169 break;
170 }
171 case REMOTELY_CLOSED:
172 {
173 if (local)
174 closeState = CloseState.CLOSED;
175 else
176 throw new IllegalStateException();
177 break;
178 }
179 default:
180 {
181 throw new IllegalStateException();
182 }
183 }
184 }
185 }
186
187 @Override
188 public void process(ControlFrame frame)
189 {
190 switch (frame.getType())
191 {
192 case SYN_STREAM:
193 {
194 openState = OpenState.SYN_RECV;
195 break;
196 }
197 case SYN_REPLY:
198 {
199 openState = OpenState.REPLY_RECV;
200 SynReplyFrame synReply = (SynReplyFrame)frame;
201 updateCloseState(synReply.isClose(), false);
202 ReplyInfo replyInfo = new ReplyInfo(synReply.getHeaders(), synReply.isClose());
203 notifyOnReply(replyInfo);
204 break;
205 }
206 case HEADERS:
207 {
208 HeadersFrame headers = (HeadersFrame)frame;
209 updateCloseState(headers.isClose(), false);
210 HeadersInfo headersInfo = new HeadersInfo(headers.getHeaders(), headers.isClose(), headers.isResetCompression());
211 notifyOnHeaders(headersInfo);
212 break;
213 }
214 case RST_STREAM:
215 {
216 reset = true;
217 break;
218 }
219 default:
220 {
221 throw new IllegalStateException();
222 }
223 }
224 session.flush();
225 }
226
227 @Override
228 public void process(DataInfo dataInfo)
229 {
230
231
232 if (isRemotelyClosed())
233 {
234 logger.debug("Stream is remotely closed, ignoring {}", dataInfo);
235 return;
236 }
237
238 if (!canReceive())
239 {
240 logger.debug("Protocol error receiving {}, resetting" + dataInfo);
241 session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
242 return;
243 }
244
245 updateCloseState(dataInfo.isClose(), false);
246 notifyOnData(dataInfo);
247 session.flush();
248 }
249
250 private void notifyOnReply(ReplyInfo replyInfo)
251 {
252 final StreamFrameListener listener = this.listener;
253 try
254 {
255 if (listener != null)
256 {
257 logger.debug("Invoking reply callback with {} on listener {}", replyInfo, listener);
258 listener.onReply(this, replyInfo);
259 }
260 }
261 catch (Exception x)
262 {
263 logger.info("Exception while notifying listener " + listener, x);
264 }
265 catch (Error x)
266 {
267 logger.info("Exception while notifying listener " + listener, x);
268 throw x;
269 }
270 }
271
272 private void notifyOnHeaders(HeadersInfo headersInfo)
273 {
274 final StreamFrameListener listener = this.listener;
275 try
276 {
277 if (listener != null)
278 {
279 logger.debug("Invoking headers callback with {} on listener {}", headersInfo, listener);
280 listener.onHeaders(this, headersInfo);
281 }
282 }
283 catch (Exception x)
284 {
285 logger.info("Exception while notifying listener " + listener, x);
286 }
287 catch (Error x)
288 {
289 logger.info("Exception while notifying listener " + listener, x);
290 throw x;
291 }
292 }
293
294 private void notifyOnData(DataInfo dataInfo)
295 {
296 final StreamFrameListener listener = this.listener;
297 try
298 {
299 if (listener != null)
300 {
301 logger.debug("Invoking data callback with {} on listener {}", dataInfo, listener);
302 listener.onData(this, dataInfo);
303 logger.debug("Invoked data callback with {} on listener {}", dataInfo, listener);
304 }
305 }
306 catch (Exception x)
307 {
308 logger.info("Exception while notifying listener " + listener, x);
309 }
310 catch (Error x)
311 {
312 logger.info("Exception while notifying listener " + listener, x);
313 throw x;
314 }
315 }
316
317 @Override
318 public Future<Stream> syn(SynInfo synInfo)
319 {
320 Promise<Stream> result = new Promise<>();
321 syn(synInfo,0,TimeUnit.MILLISECONDS,result);
322 return result;
323 }
324
325 @Override
326 public void syn(SynInfo synInfo, long timeout, TimeUnit unit, Handler<Stream> handler)
327 {
328 if (isClosed() || isReset())
329 {
330 handler.failed(this, new StreamException(getId(), StreamStatus.STREAM_ALREADY_CLOSED));
331 return;
332 }
333 PushSynInfo pushSynInfo = new PushSynInfo(getId(), synInfo);
334 session.syn(pushSynInfo, null, timeout, unit, handler);
335 }
336
337 @Override
338 public Future<Void> reply(ReplyInfo replyInfo)
339 {
340 Promise<Void> result = new Promise<>();
341 reply(replyInfo,0,TimeUnit.MILLISECONDS,result);
342 return result;
343 }
344
345 @Override
346 public void reply(ReplyInfo replyInfo, long timeout, TimeUnit unit, Handler<Void> handler)
347 {
348 if (isUnidirectional())
349 throw new IllegalStateException("Protocol violation: cannot send SYN_REPLY frames in unidirectional streams");
350 openState = OpenState.REPLY_SENT;
351 updateCloseState(replyInfo.isClose(), true);
352 SynReplyFrame frame = new SynReplyFrame(session.getVersion(), replyInfo.getFlags(), getId(), replyInfo.getHeaders());
353 session.control(this, frame, timeout, unit, handler, null);
354 }
355
356 @Override
357 public Future<Void> data(DataInfo dataInfo)
358 {
359 Promise<Void> result = new Promise<>();
360 data(dataInfo,0,TimeUnit.MILLISECONDS,result);
361 return result;
362 }
363
364 @Override
365 public void data(DataInfo dataInfo, long timeout, TimeUnit unit, Handler<Void> handler)
366 {
367 if (!canSend())
368 {
369 session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
370 throw new IllegalStateException("Protocol violation: cannot send a DATA frame before a SYN_REPLY frame");
371 }
372 if (isLocallyClosed())
373 {
374 session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
375 throw new IllegalStateException("Protocol violation: cannot send a DATA frame on a closed stream");
376 }
377
378
379
380 session.data(this, dataInfo, timeout, unit, handler, null);
381 }
382
383 @Override
384 public Future<Void> headers(HeadersInfo headersInfo)
385 {
386 Promise<Void> result = new Promise<>();
387 headers(headersInfo,0,TimeUnit.MILLISECONDS,result);
388 return result;
389 }
390
391 @Override
392 public void headers(HeadersInfo headersInfo, long timeout, TimeUnit unit, Handler<Void> handler)
393 {
394 if (!canSend())
395 {
396 session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
397 throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame before a SYN_REPLY frame");
398 }
399 if (isLocallyClosed())
400 {
401 session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
402 throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame on a closed stream");
403 }
404
405 updateCloseState(headersInfo.isClose(), true);
406 HeadersFrame frame = new HeadersFrame(session.getVersion(), headersInfo.getFlags(), getId(), headersInfo.getHeaders());
407 session.control(this, frame, timeout, unit, handler, null);
408 }
409
410 @Override
411 public boolean isUnidirectional()
412 {
413 return associatedStream != null;
414 }
415
416 @Override
417 public boolean isReset()
418 {
419 return reset;
420 }
421
422 @Override
423 public boolean isHalfClosed()
424 {
425 CloseState closeState = this.closeState;
426 return closeState == CloseState.LOCALLY_CLOSED || closeState == CloseState.REMOTELY_CLOSED || closeState == CloseState.CLOSED;
427 }
428
429 @Override
430 public boolean isClosed()
431 {
432 return closeState == CloseState.CLOSED;
433 }
434
435 private boolean isLocallyClosed()
436 {
437 CloseState closeState = this.closeState;
438 return closeState == CloseState.LOCALLY_CLOSED || closeState == CloseState.CLOSED;
439 }
440
441 private boolean isRemotelyClosed()
442 {
443 CloseState closeState = this.closeState;
444 return closeState == CloseState.REMOTELY_CLOSED || closeState == CloseState.CLOSED;
445 }
446
447 @Override
448 public String toString()
449 {
450 return String.format("stream=%d v%d windowSize=%db reset=%s %s %s", getId(), session.getVersion(), getWindowSize(), isReset(), openState, closeState);
451 }
452
453 private boolean canSend()
454 {
455 OpenState openState = this.openState;
456 return openState == OpenState.SYN_SENT || openState == OpenState.REPLY_RECV || openState == OpenState.REPLY_SENT;
457 }
458
459 private boolean canReceive()
460 {
461 OpenState openState = this.openState;
462 return openState == OpenState.SYN_RECV || openState == OpenState.REPLY_RECV || openState == OpenState.REPLY_SENT;
463 }
464
465 private enum OpenState
466 {
467 SYN_SENT, SYN_RECV, REPLY_SENT, REPLY_RECV
468 }
469
470 private enum CloseState
471 {
472 OPENED, LOCALLY_CLOSED, REMOTELY_CLOSED, CLOSED
473 }
474 }