1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.http2;
20
21 import java.io.EOFException;
22 import java.io.IOException;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.TimeoutException;
26 import java.util.concurrent.atomic.AtomicInteger;
27 import java.util.concurrent.atomic.AtomicReference;
28
29 import org.eclipse.jetty.http2.api.Stream;
30 import org.eclipse.jetty.http2.frames.DataFrame;
31 import org.eclipse.jetty.http2.frames.Frame;
32 import org.eclipse.jetty.http2.frames.HeadersFrame;
33 import org.eclipse.jetty.http2.frames.PushPromiseFrame;
34 import org.eclipse.jetty.http2.frames.ResetFrame;
35 import org.eclipse.jetty.io.IdleTimeout;
36 import org.eclipse.jetty.util.Callback;
37 import org.eclipse.jetty.util.Promise;
38 import org.eclipse.jetty.util.log.Log;
39 import org.eclipse.jetty.util.log.Logger;
40 import org.eclipse.jetty.util.thread.Scheduler;
41
42 public class HTTP2Stream extends IdleTimeout implements IStream
43 {
44 private static final Logger LOG = Log.getLogger(HTTP2Stream.class);
45
46 private final AtomicReference<ConcurrentMap<String, Object>> attributes = new AtomicReference<>();
47 private final AtomicReference<CloseState> closeState = new AtomicReference<>(CloseState.NOT_CLOSED);
48 private final AtomicInteger sendWindow = new AtomicInteger();
49 private final AtomicInteger recvWindow = new AtomicInteger();
50 private final ISession session;
51 private final int streamId;
52 private volatile Listener listener;
53 private volatile boolean localReset;
54 private volatile boolean remoteReset;
55
56 public HTTP2Stream(Scheduler scheduler, ISession session, int streamId)
57 {
58 super(scheduler);
59 this.session = session;
60 this.streamId = streamId;
61 }
62
63 @Override
64 public int getId()
65 {
66 return streamId;
67 }
68
69 @Override
70 public ISession getSession()
71 {
72 return session;
73 }
74
75 @Override
76 public void headers(HeadersFrame frame, Callback callback)
77 {
78 notIdle();
79 session.frames(this, callback, frame, Frame.EMPTY_ARRAY);
80 }
81
82 @Override
83 public void push(PushPromiseFrame frame, Promise<Stream> promise, Listener listener)
84 {
85 notIdle();
86 session.push(this, promise, frame, listener);
87 }
88
89 @Override
90 public void data(DataFrame frame, Callback callback)
91 {
92 notIdle();
93 session.data(this, callback, frame);
94 }
95
96 @Override
97 public void reset(ResetFrame frame, Callback callback)
98 {
99 if (isReset())
100 return;
101 notIdle();
102 localReset = true;
103 session.frames(this, callback, frame, Frame.EMPTY_ARRAY);
104 }
105
106 @Override
107 public Object getAttribute(String key)
108 {
109 return attributes().get(key);
110 }
111
112 @Override
113 public void setAttribute(String key, Object value)
114 {
115 attributes().put(key, value);
116 }
117
118 @Override
119 public Object removeAttribute(String key)
120 {
121 return attributes().remove(key);
122 }
123
124 @Override
125 public boolean isReset()
126 {
127 return localReset || remoteReset;
128 }
129
130 @Override
131 public boolean isClosed()
132 {
133 return closeState.get() == CloseState.CLOSED;
134 }
135
136 public boolean isRemotelyClosed()
137 {
138 return closeState.get() == CloseState.REMOTELY_CLOSED;
139 }
140
141 public boolean isLocallyClosed()
142 {
143 return closeState.get() == CloseState.LOCALLY_CLOSED;
144 }
145
146 @Override
147 public boolean isOpen()
148 {
149 return !isClosed();
150 }
151
152 @Override
153 protected void onIdleExpired(TimeoutException timeout)
154 {
155 if (LOG.isDebugEnabled())
156 LOG.debug("Idle timeout {}ms expired on {}", getIdleTimeout(), this);
157
158
159
160 close();
161
162
163 reset(new ResetFrame(getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
164
165
166 notifyTimeout(this, timeout);
167 }
168
169 private ConcurrentMap<String, Object> attributes()
170 {
171 ConcurrentMap<String, Object> map = attributes.get();
172 if (map == null)
173 {
174 map = new ConcurrentHashMap<>();
175 if (!attributes.compareAndSet(null, map))
176 {
177 map = attributes.get();
178 }
179 }
180 return map;
181 }
182
183 @Override
184 public Listener getListener()
185 {
186 return listener;
187 }
188
189 @Override
190 public void setListener(Listener listener)
191 {
192 this.listener = listener;
193 }
194
195 @Override
196 public void process(Frame frame, Callback callback)
197 {
198 notIdle();
199 switch (frame.getType())
200 {
201 case HEADERS:
202 {
203 onHeaders((HeadersFrame)frame, callback);
204 break;
205 }
206 case DATA:
207 {
208 onData((DataFrame)frame, callback);
209 break;
210 }
211 case RST_STREAM:
212 {
213 onReset((ResetFrame)frame, callback);
214 break;
215 }
216 case PUSH_PROMISE:
217 {
218 onPush((PushPromiseFrame)frame, callback);
219 break;
220 }
221 default:
222 {
223 throw new UnsupportedOperationException();
224 }
225 }
226 }
227
228 private void onHeaders(HeadersFrame frame, Callback callback)
229 {
230 if (updateClose(frame.isEndStream(), false))
231 session.removeStream(this, false);
232 callback.succeeded();
233 }
234
235 private void onData(DataFrame frame, Callback callback)
236 {
237 if (getRecvWindow() < 0)
238 {
239
240
241 session.close(ErrorCode.FLOW_CONTROL_ERROR.code, "stream_window_exceeded", Callback.NOOP);
242 callback.failed(new IOException("stream_window_exceeded"));
243 return;
244 }
245
246
247 if (isRemotelyClosed())
248 {
249 reset(new ResetFrame(streamId, ErrorCode.STREAM_CLOSED_ERROR.code), Callback.NOOP);
250 callback.failed(new EOFException("stream_closed"));
251 return;
252 }
253
254 if (isReset())
255 {
256
257 callback.failed(new IOException("stream_reset"));
258 return;
259 }
260
261 if (updateClose(frame.isEndStream(), false))
262 session.removeStream(this, false);
263 notifyData(this, frame, callback);
264 }
265
266 private void onReset(ResetFrame frame, Callback callback)
267 {
268 remoteReset = true;
269 close();
270 session.removeStream(this, false);
271 callback.succeeded();
272 notifyReset(this, frame);
273 }
274
275 private void onPush(PushPromiseFrame frame, Callback callback)
276 {
277
278
279 updateClose(true, true);
280 callback.succeeded();
281 }
282
283 @Override
284 public boolean updateClose(boolean update, boolean local)
285 {
286 if (LOG.isDebugEnabled())
287 LOG.debug("Update close for {} close={} local={}", this, update, local);
288
289 if (!update)
290 return false;
291
292 while (true)
293 {
294 CloseState current = closeState.get();
295 switch (current)
296 {
297 case NOT_CLOSED:
298 {
299 CloseState newValue = local ? CloseState.LOCALLY_CLOSED : CloseState.REMOTELY_CLOSED;
300 if (closeState.compareAndSet(current, newValue))
301 return false;
302 break;
303 }
304 case LOCALLY_CLOSED:
305 {
306 if (local)
307 return false;
308 close();
309 return true;
310 }
311 case REMOTELY_CLOSED:
312 {
313 if (!local)
314 return false;
315 close();
316 return true;
317 }
318 default:
319 {
320 return false;
321 }
322 }
323 }
324 }
325
326 public int getSendWindow()
327 {
328 return sendWindow.get();
329 }
330
331 public int getRecvWindow()
332 {
333 return recvWindow.get();
334 }
335
336 @Override
337 public int updateSendWindow(int delta)
338 {
339 return sendWindow.getAndAdd(delta);
340 }
341
342 @Override
343 public int updateRecvWindow(int delta)
344 {
345 return recvWindow.getAndAdd(delta);
346 }
347
348 @Override
349 public void close()
350 {
351 closeState.set(CloseState.CLOSED);
352 onClose();
353 }
354
355 private void notifyData(Stream stream, DataFrame frame, Callback callback)
356 {
357 final Listener listener = this.listener;
358 if (listener == null)
359 return;
360 try
361 {
362 listener.onData(stream, frame, callback);
363 }
364 catch (Throwable x)
365 {
366 LOG.info("Failure while notifying listener " + listener, x);
367 }
368 }
369
370 private void notifyReset(Stream stream, ResetFrame frame)
371 {
372 final Listener listener = this.listener;
373 if (listener == null)
374 return;
375 try
376 {
377 listener.onReset(stream, frame);
378 }
379 catch (Throwable x)
380 {
381 LOG.info("Failure while notifying listener " + listener, x);
382 }
383 }
384
385 private void notifyTimeout(Stream stream, Throwable failure)
386 {
387 Listener listener = this.listener;
388 if (listener == null)
389 return;
390 try
391 {
392 listener.onTimeout(stream, failure);
393 }
394 catch (Throwable x)
395 {
396 LOG.info("Failure while notifying listener " + listener, x);
397 }
398 }
399
400 @Override
401 public String toString()
402 {
403 return String.format("%s@%x#%d{sendWindow=%s,recvWindow=%s,reset=%b,%s}", getClass().getSimpleName(),
404 hashCode(), getId(), sendWindow, recvWindow, isReset(), closeState);
405 }
406 }