1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.spdy;
15
16 import java.util.Collections;
17 import java.util.Map;
18 import java.util.Set;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.Future;
21 import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.atomic.AtomicInteger;
23
24 import org.eclipse.jetty.spdy.api.DataInfo;
25 import org.eclipse.jetty.spdy.api.Handler;
26 import org.eclipse.jetty.spdy.api.HeadersInfo;
27 import org.eclipse.jetty.spdy.api.ReplyInfo;
28 import org.eclipse.jetty.spdy.api.RstInfo;
29 import org.eclipse.jetty.spdy.api.Stream;
30 import org.eclipse.jetty.spdy.api.StreamFrameListener;
31 import org.eclipse.jetty.spdy.api.StreamStatus;
32 import org.eclipse.jetty.spdy.api.SynInfo;
33 import org.eclipse.jetty.spdy.frames.ControlFrame;
34 import org.eclipse.jetty.spdy.frames.HeadersFrame;
35 import org.eclipse.jetty.spdy.frames.SynReplyFrame;
36 import org.eclipse.jetty.util.log.Log;
37 import org.eclipse.jetty.util.log.Logger;
38
39 public class StandardStream implements IStream
40 {
41 private static final Logger logger = Log.getLogger(Stream.class);
42 private final Map<String, Object> attributes = new ConcurrentHashMap<>();
43 private final int id;
44 private final byte priority;
45 private final ISession session;
46 private final IStream associatedStream;
47 private final AtomicInteger windowSize = new AtomicInteger();
48 private final Set<Stream> pushedStreams = Collections.newSetFromMap(new ConcurrentHashMap<Stream, Boolean>());
49 private volatile StreamFrameListener listener;
50 private volatile OpenState openState = OpenState.SYN_SENT;
51 private volatile CloseState closeState = CloseState.OPENED;
52 private volatile boolean reset = false;
53
54 public StandardStream(int id, byte priority, ISession session, IStream associatedStream)
55 {
56 this.id = id;
57 this.priority = priority;
58 this.session = session;
59 this.associatedStream = associatedStream;
60 }
61
62 @Override
63 public int getId()
64 {
65 return id;
66 }
67
68 @Override
69 public IStream getAssociatedStream()
70 {
71 return associatedStream;
72 }
73
74 @Override
75 public Set<Stream> getPushedStreams()
76 {
77 return pushedStreams;
78 }
79
80 @Override
81 public void associate(IStream stream)
82 {
83 pushedStreams.add(stream);
84 }
85
86 @Override
87 public void disassociate(IStream stream)
88 {
89 pushedStreams.remove(stream);
90 }
91
92 @Override
93 public byte getPriority()
94 {
95 return priority;
96 }
97
98 @Override
99 public int getWindowSize()
100 {
101 return windowSize.get();
102 }
103
104 @Override
105 public void updateWindowSize(int delta)
106 {
107 int size = windowSize.addAndGet(delta);
108 logger.debug("Updated window size {} -> {} for {}", size - delta, size, this);
109 }
110
111 @Override
112 public ISession getSession()
113 {
114 return session;
115 }
116
117 @Override
118 public Object getAttribute(String key)
119 {
120 return attributes.get(key);
121 }
122
123 @Override
124 public void setAttribute(String key, Object value)
125 {
126 attributes.put(key,value);
127 }
128
129 @Override
130 public Object removeAttribute(String key)
131 {
132 return attributes.remove(key);
133 }
134
135 @Override
136 public void setStreamFrameListener(StreamFrameListener listener)
137 {
138 this.listener = listener;
139 }
140
141 public StreamFrameListener getStreamFrameListener()
142 {
143 return listener;
144 }
145
146 @Override
147 public void updateCloseState(boolean close, boolean local)
148 {
149 if (close)
150 {
151 switch (closeState)
152 {
153 case OPENED:
154 {
155 closeState = local ? CloseState.LOCALLY_CLOSED : CloseState.REMOTELY_CLOSED;
156 break;
157 }
158 case LOCALLY_CLOSED:
159 {
160 if (local)
161 throw new IllegalStateException();
162 else
163 closeState = CloseState.CLOSED;
164 break;
165 }
166 case REMOTELY_CLOSED:
167 {
168 if (local)
169 closeState = CloseState.CLOSED;
170 else
171 throw new IllegalStateException();
172 break;
173 }
174 default:
175 {
176 throw new IllegalStateException();
177 }
178 }
179 }
180 }
181
182 @Override
183 public void process(ControlFrame frame)
184 {
185 switch (frame.getType())
186 {
187 case SYN_STREAM:
188 {
189 openState = OpenState.SYN_RECV;
190 break;
191 }
192 case SYN_REPLY:
193 {
194 openState = OpenState.REPLY_RECV;
195 SynReplyFrame synReply = (SynReplyFrame)frame;
196 updateCloseState(synReply.isClose(), false);
197 ReplyInfo replyInfo = new ReplyInfo(synReply.getHeaders(), synReply.isClose());
198 notifyOnReply(replyInfo);
199 break;
200 }
201 case HEADERS:
202 {
203 HeadersFrame headers = (HeadersFrame)frame;
204 updateCloseState(headers.isClose(), false);
205 HeadersInfo headersInfo = new HeadersInfo(headers.getHeaders(), headers.isClose(), headers.isResetCompression());
206 notifyOnHeaders(headersInfo);
207 break;
208 }
209 case RST_STREAM:
210 {
211 reset = true;
212 break;
213 }
214 default:
215 {
216 throw new IllegalStateException();
217 }
218 }
219 session.flush();
220 }
221
222 @Override
223 public void process(DataInfo dataInfo)
224 {
225
226
227 if (isRemotelyClosed())
228 {
229 logger.debug("Stream is remotely closed, ignoring {}", dataInfo);
230 return;
231 }
232
233 if (!canReceive())
234 {
235 logger.debug("Protocol error receiving {}, resetting" + dataInfo);
236 session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
237 return;
238 }
239
240 updateCloseState(dataInfo.isClose(), false);
241 notifyOnData(dataInfo);
242 session.flush();
243 }
244
245 private void notifyOnReply(ReplyInfo replyInfo)
246 {
247 final StreamFrameListener listener = this.listener;
248 try
249 {
250 if (listener != null)
251 {
252 logger.debug("Invoking reply callback with {} on listener {}", replyInfo, listener);
253 listener.onReply(this, replyInfo);
254 }
255 }
256 catch (Exception x)
257 {
258 logger.info("Exception while notifying listener " + listener, x);
259 }
260 catch (Error x)
261 {
262 logger.info("Exception while notifying listener " + listener, x);
263 throw x;
264 }
265 }
266
267 private void notifyOnHeaders(HeadersInfo headersInfo)
268 {
269 final StreamFrameListener listener = this.listener;
270 try
271 {
272 if (listener != null)
273 {
274 logger.debug("Invoking headers callback with {} on listener {}", headersInfo, listener);
275 listener.onHeaders(this, headersInfo);
276 }
277 }
278 catch (Exception x)
279 {
280 logger.info("Exception while notifying listener " + listener, x);
281 }
282 catch (Error x)
283 {
284 logger.info("Exception while notifying listener " + listener, x);
285 throw x;
286 }
287 }
288
289 private void notifyOnData(DataInfo dataInfo)
290 {
291 final StreamFrameListener listener = this.listener;
292 try
293 {
294 if (listener != null)
295 {
296 logger.debug("Invoking data callback with {} on listener {}", dataInfo, listener);
297 listener.onData(this, dataInfo);
298 logger.debug("Invoked data callback with {} on listener {}", dataInfo, listener);
299 }
300 }
301 catch (Exception x)
302 {
303 logger.info("Exception while notifying listener " + listener, x);
304 }
305 catch (Error x)
306 {
307 logger.info("Exception while notifying listener " + listener, x);
308 throw x;
309 }
310 }
311
312 @Override
313 public Future<Stream> syn(SynInfo synInfo)
314 {
315 Promise<Stream> result = new Promise<>();
316 syn(synInfo,0,TimeUnit.MILLISECONDS,result);
317 return result;
318 }
319
320 @Override
321 public void syn(SynInfo synInfo, long timeout, TimeUnit unit, Handler<Stream> handler)
322 {
323 if (isClosed() || isReset())
324 {
325 handler.failed(this, new StreamException(getId(), StreamStatus.STREAM_ALREADY_CLOSED));
326 return;
327 }
328 PushSynInfo pushSynInfo = new PushSynInfo(getId(), synInfo);
329 session.syn(pushSynInfo, null, timeout, unit, handler);
330 }
331
332 @Override
333 public Future<Void> reply(ReplyInfo replyInfo)
334 {
335 Promise<Void> result = new Promise<>();
336 reply(replyInfo,0,TimeUnit.MILLISECONDS,result);
337 return result;
338 }
339
340 @Override
341 public void reply(ReplyInfo replyInfo, long timeout, TimeUnit unit, Handler<Void> handler)
342 {
343 if (isUnidirectional())
344 throw new IllegalStateException("Protocol violation: cannot send SYN_REPLY frames in unidirectional streams");
345 openState = OpenState.REPLY_SENT;
346 updateCloseState(replyInfo.isClose(), true);
347 SynReplyFrame frame = new SynReplyFrame(session.getVersion(), replyInfo.getFlags(), getId(), replyInfo.getHeaders());
348 session.control(this, frame, timeout, unit, handler, null);
349 }
350
351 @Override
352 public Future<Void> data(DataInfo dataInfo)
353 {
354 Promise<Void> result = new Promise<>();
355 data(dataInfo,0,TimeUnit.MILLISECONDS,result);
356 return result;
357 }
358
359 @Override
360 public void data(DataInfo dataInfo, long timeout, TimeUnit unit, Handler<Void> handler)
361 {
362 if (!canSend())
363 {
364 session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
365 throw new IllegalStateException("Protocol violation: cannot send a DATA frame before a SYN_REPLY frame");
366 }
367 if (isLocallyClosed())
368 {
369 session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
370 throw new IllegalStateException("Protocol violation: cannot send a DATA frame on a closed stream");
371 }
372
373
374
375 session.data(this, dataInfo, timeout, unit, handler, null);
376 }
377
378 @Override
379 public Future<Void> headers(HeadersInfo headersInfo)
380 {
381 Promise<Void> result = new Promise<>();
382 headers(headersInfo,0,TimeUnit.MILLISECONDS,result);
383 return result;
384 }
385
386 @Override
387 public void headers(HeadersInfo headersInfo, long timeout, TimeUnit unit, Handler<Void> handler)
388 {
389 if (!canSend())
390 {
391 session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
392 throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame before a SYN_REPLY frame");
393 }
394 if (isLocallyClosed())
395 {
396 session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
397 throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame on a closed stream");
398 }
399
400 updateCloseState(headersInfo.isClose(), true);
401 HeadersFrame frame = new HeadersFrame(session.getVersion(), headersInfo.getFlags(), getId(), headersInfo.getHeaders());
402 session.control(this, frame, timeout, unit, handler, null);
403 }
404
405 @Override
406 public boolean isUnidirectional()
407 {
408 return associatedStream != null;
409 }
410
411 @Override
412 public boolean isReset()
413 {
414 return reset;
415 }
416
417 @Override
418 public boolean isHalfClosed()
419 {
420 CloseState closeState = this.closeState;
421 return closeState == CloseState.LOCALLY_CLOSED || closeState == CloseState.REMOTELY_CLOSED || closeState == CloseState.CLOSED;
422 }
423
424 @Override
425 public boolean isClosed()
426 {
427 return closeState == CloseState.CLOSED;
428 }
429
430 private boolean isLocallyClosed()
431 {
432 CloseState closeState = this.closeState;
433 return closeState == CloseState.LOCALLY_CLOSED || closeState == CloseState.CLOSED;
434 }
435
436 private boolean isRemotelyClosed()
437 {
438 CloseState closeState = this.closeState;
439 return closeState == CloseState.REMOTELY_CLOSED || closeState == CloseState.CLOSED;
440 }
441
442 @Override
443 public String toString()
444 {
445 return String.format("stream=%d v%d windowSize=%db reset=%s %s %s", getId(), session.getVersion(), getWindowSize(), isReset(), openState, closeState);
446 }
447
448 private boolean canSend()
449 {
450 OpenState openState = this.openState;
451 return openState == OpenState.SYN_SENT || openState == OpenState.REPLY_RECV || openState == OpenState.REPLY_SENT;
452 }
453
454 private boolean canReceive()
455 {
456 OpenState openState = this.openState;
457 return openState == OpenState.SYN_RECV || openState == OpenState.REPLY_RECV || openState == OpenState.REPLY_SENT;
458 }
459
460 private enum OpenState
461 {
462 SYN_SENT, SYN_RECV, REPLY_SENT, REPLY_RECV
463 }
464
465 private enum CloseState
466 {
467 OPENED, LOCALLY_CLOSED, REMOTELY_CLOSED, CLOSED
468 }
469 }