1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.http2;
20
21 import java.io.EOFException;
22 import java.io.IOException;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.TimeoutException;
26 import java.util.concurrent.atomic.AtomicInteger;
27 import java.util.concurrent.atomic.AtomicReference;
28
29 import org.eclipse.jetty.http2.api.Stream;
30 import org.eclipse.jetty.http2.frames.DataFrame;
31 import org.eclipse.jetty.http2.frames.Frame;
32 import org.eclipse.jetty.http2.frames.HeadersFrame;
33 import org.eclipse.jetty.http2.frames.PushPromiseFrame;
34 import org.eclipse.jetty.http2.frames.ResetFrame;
35 import org.eclipse.jetty.io.IdleTimeout;
36 import org.eclipse.jetty.util.Callback;
37 import org.eclipse.jetty.util.Promise;
38 import org.eclipse.jetty.util.log.Log;
39 import org.eclipse.jetty.util.log.Logger;
40 import org.eclipse.jetty.util.thread.Scheduler;
41
42 public class HTTP2Stream extends IdleTimeout implements IStream
43 {
44 private static final Logger LOG = Log.getLogger(HTTP2Stream.class);
45
46 private final AtomicReference<ConcurrentMap<String, Object>> attributes = new AtomicReference<>();
47 private final AtomicReference<CloseState> closeState = new AtomicReference<>(CloseState.NOT_CLOSED);
48 private final AtomicInteger sendWindow = new AtomicInteger();
49 private final AtomicInteger recvWindow = new AtomicInteger();
50 private final ISession session;
51 private final int streamId;
52 private final boolean local;
53 private volatile Listener listener;
54 private volatile boolean localReset;
55 private volatile boolean remoteReset;
56
57 public HTTP2Stream(Scheduler scheduler, ISession session, int streamId, boolean local)
58 {
59 super(scheduler);
60 this.session = session;
61 this.streamId = streamId;
62 this.local = local;
63 }
64
65 @Override
66 public int getId()
67 {
68 return streamId;
69 }
70
71 @Override
72 public boolean isLocal()
73 {
74 return local;
75 }
76
77 @Override
78 public ISession getSession()
79 {
80 return session;
81 }
82
83 @Override
84 public void headers(HeadersFrame frame, Callback callback)
85 {
86 notIdle();
87 session.frames(this, callback, frame, Frame.EMPTY_ARRAY);
88 }
89
90 @Override
91 public void push(PushPromiseFrame frame, Promise<Stream> promise, Listener listener)
92 {
93 notIdle();
94 session.push(this, promise, frame, listener);
95 }
96
97 @Override
98 public void data(DataFrame frame, Callback callback)
99 {
100 notIdle();
101 session.data(this, callback, frame);
102 }
103
104 @Override
105 public void reset(ResetFrame frame, Callback callback)
106 {
107 if (isReset())
108 return;
109 notIdle();
110 localReset = true;
111 session.frames(this, callback, frame, Frame.EMPTY_ARRAY);
112 }
113
114 @Override
115 public Object getAttribute(String key)
116 {
117 return attributes().get(key);
118 }
119
120 @Override
121 public void setAttribute(String key, Object value)
122 {
123 attributes().put(key, value);
124 }
125
126 @Override
127 public Object removeAttribute(String key)
128 {
129 return attributes().remove(key);
130 }
131
132 @Override
133 public boolean isReset()
134 {
135 return localReset || remoteReset;
136 }
137
138 @Override
139 public boolean isClosed()
140 {
141 return closeState.get() == CloseState.CLOSED;
142 }
143
144 public boolean isRemotelyClosed()
145 {
146 return closeState.get() == CloseState.REMOTELY_CLOSED;
147 }
148
149 public boolean isLocallyClosed()
150 {
151 return closeState.get() == CloseState.LOCALLY_CLOSED;
152 }
153
154 @Override
155 public boolean isOpen()
156 {
157 return !isClosed();
158 }
159
160 @Override
161 protected void onIdleExpired(TimeoutException timeout)
162 {
163 if (LOG.isDebugEnabled())
164 LOG.debug("Idle timeout {}ms expired on {}", getIdleTimeout(), this);
165
166
167
168 close();
169
170
171 reset(new ResetFrame(getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
172
173
174 notifyTimeout(this, timeout);
175 }
176
177 private ConcurrentMap<String, Object> attributes()
178 {
179 ConcurrentMap<String, Object> map = attributes.get();
180 if (map == null)
181 {
182 map = new ConcurrentHashMap<>();
183 if (!attributes.compareAndSet(null, map))
184 {
185 map = attributes.get();
186 }
187 }
188 return map;
189 }
190
191 @Override
192 public Listener getListener()
193 {
194 return listener;
195 }
196
197 @Override
198 public void setListener(Listener listener)
199 {
200 this.listener = listener;
201 }
202
203 @Override
204 public void process(Frame frame, Callback callback)
205 {
206 notIdle();
207 switch (frame.getType())
208 {
209 case HEADERS:
210 {
211 onHeaders((HeadersFrame)frame, callback);
212 break;
213 }
214 case DATA:
215 {
216 onData((DataFrame)frame, callback);
217 break;
218 }
219 case RST_STREAM:
220 {
221 onReset((ResetFrame)frame, callback);
222 break;
223 }
224 case PUSH_PROMISE:
225 {
226 onPush((PushPromiseFrame)frame, callback);
227 break;
228 }
229 default:
230 {
231 throw new UnsupportedOperationException();
232 }
233 }
234 }
235
236 private void onHeaders(HeadersFrame frame, Callback callback)
237 {
238 if (updateClose(frame.isEndStream(), false))
239 session.removeStream(this);
240 callback.succeeded();
241 }
242
243 private void onData(DataFrame frame, Callback callback)
244 {
245 if (getRecvWindow() < 0)
246 {
247
248
249 session.close(ErrorCode.FLOW_CONTROL_ERROR.code, "stream_window_exceeded", Callback.NOOP);
250 callback.failed(new IOException("stream_window_exceeded"));
251 return;
252 }
253
254
255 if (isRemotelyClosed())
256 {
257 reset(new ResetFrame(streamId, ErrorCode.STREAM_CLOSED_ERROR.code), Callback.NOOP);
258 callback.failed(new EOFException("stream_closed"));
259 return;
260 }
261
262 if (isReset())
263 {
264
265 callback.failed(new IOException("stream_reset"));
266 return;
267 }
268
269 if (updateClose(frame.isEndStream(), false))
270 session.removeStream(this);
271 notifyData(this, frame, callback);
272 }
273
274 private void onReset(ResetFrame frame, Callback callback)
275 {
276 remoteReset = true;
277 close();
278 session.removeStream(this);
279 callback.succeeded();
280 notifyReset(this, frame);
281 }
282
283 private void onPush(PushPromiseFrame frame, Callback callback)
284 {
285
286
287 updateClose(true, true);
288 callback.succeeded();
289 }
290
291 @Override
292 public boolean updateClose(boolean update, boolean local)
293 {
294 if (LOG.isDebugEnabled())
295 LOG.debug("Update close for {} close={} local={}", this, update, local);
296
297 if (!update)
298 return false;
299
300 while (true)
301 {
302 CloseState current = closeState.get();
303 switch (current)
304 {
305 case NOT_CLOSED:
306 {
307 CloseState newValue = local ? CloseState.LOCALLY_CLOSED : CloseState.REMOTELY_CLOSED;
308 if (closeState.compareAndSet(current, newValue))
309 return false;
310 break;
311 }
312 case LOCALLY_CLOSED:
313 {
314 if (local)
315 return false;
316 close();
317 return true;
318 }
319 case REMOTELY_CLOSED:
320 {
321 if (!local)
322 return false;
323 close();
324 return true;
325 }
326 default:
327 {
328 return false;
329 }
330 }
331 }
332 }
333
334 public int getSendWindow()
335 {
336 return sendWindow.get();
337 }
338
339 public int getRecvWindow()
340 {
341 return recvWindow.get();
342 }
343
344 @Override
345 public int updateSendWindow(int delta)
346 {
347 return sendWindow.getAndAdd(delta);
348 }
349
350 @Override
351 public int updateRecvWindow(int delta)
352 {
353 return recvWindow.getAndAdd(delta);
354 }
355
356 @Override
357 public void close()
358 {
359 closeState.set(CloseState.CLOSED);
360 onClose();
361 }
362
363 private void notifyData(Stream stream, DataFrame frame, Callback callback)
364 {
365 final Listener listener = this.listener;
366 if (listener == null)
367 return;
368 try
369 {
370 listener.onData(stream, frame, callback);
371 }
372 catch (Throwable x)
373 {
374 LOG.info("Failure while notifying listener " + listener, x);
375 }
376 }
377
378 private void notifyReset(Stream stream, ResetFrame frame)
379 {
380 final Listener listener = this.listener;
381 if (listener == null)
382 return;
383 try
384 {
385 listener.onReset(stream, frame);
386 }
387 catch (Throwable x)
388 {
389 LOG.info("Failure while notifying listener " + listener, x);
390 }
391 }
392
393 private void notifyTimeout(Stream stream, Throwable failure)
394 {
395 Listener listener = this.listener;
396 if (listener == null)
397 return;
398 try
399 {
400 listener.onTimeout(stream, failure);
401 }
402 catch (Throwable x)
403 {
404 LOG.info("Failure while notifying listener " + listener, x);
405 }
406 }
407
408 @Override
409 public String toString()
410 {
411 return String.format("%s@%x#%d{sendWindow=%s,recvWindow=%s,reset=%b,%s}", getClass().getSimpleName(),
412 hashCode(), getId(), sendWindow, recvWindow, isReset(), closeState);
413 }
414 }