View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.http2;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.util.concurrent.ConcurrentHashMap;
24  import java.util.concurrent.ConcurrentMap;
25  import java.util.concurrent.TimeoutException;
26  import java.util.concurrent.atomic.AtomicInteger;
27  import java.util.concurrent.atomic.AtomicReference;
28  
29  import org.eclipse.jetty.http2.api.Stream;
30  import org.eclipse.jetty.http2.frames.DataFrame;
31  import org.eclipse.jetty.http2.frames.Frame;
32  import org.eclipse.jetty.http2.frames.HeadersFrame;
33  import org.eclipse.jetty.http2.frames.PushPromiseFrame;
34  import org.eclipse.jetty.http2.frames.ResetFrame;
35  import org.eclipse.jetty.io.IdleTimeout;
36  import org.eclipse.jetty.util.Callback;
37  import org.eclipse.jetty.util.Promise;
38  import org.eclipse.jetty.util.log.Log;
39  import org.eclipse.jetty.util.log.Logger;
40  import org.eclipse.jetty.util.thread.Scheduler;
41  
42  public class HTTP2Stream extends IdleTimeout implements IStream
43  {
44      private static final Logger LOG = Log.getLogger(HTTP2Stream.class);
45  
46      private final AtomicReference<ConcurrentMap<String, Object>> attributes = new AtomicReference<>();
47      private final AtomicReference<CloseState> closeState = new AtomicReference<>(CloseState.NOT_CLOSED);
48      private final AtomicInteger sendWindow = new AtomicInteger();
49      private final AtomicInteger recvWindow = new AtomicInteger();
50      private final ISession session;
51      private final int streamId;
52      private final boolean local;
53      private volatile Listener listener;
54      private volatile boolean localReset;
55      private volatile boolean remoteReset;
56  
57      public HTTP2Stream(Scheduler scheduler, ISession session, int streamId, boolean local)
58      {
59          super(scheduler);
60          this.session = session;
61          this.streamId = streamId;
62          this.local = local;
63      }
64  
65      @Override
66      public int getId()
67      {
68          return streamId;
69      }
70  
71      @Override
72      public boolean isLocal()
73      {
74          return local;
75      }
76  
77      @Override
78      public ISession getSession()
79      {
80          return session;
81      }
82  
83      @Override
84      public void headers(HeadersFrame frame, Callback callback)
85      {
86          notIdle();
87          session.frames(this, callback, frame, Frame.EMPTY_ARRAY);
88      }
89  
90      @Override
91      public void push(PushPromiseFrame frame, Promise<Stream> promise, Listener listener)
92      {
93          notIdle();
94          session.push(this, promise, frame, listener);
95      }
96  
97      @Override
98      public void data(DataFrame frame, Callback callback)
99      {
100         notIdle();
101         session.data(this, callback, frame);
102     }
103 
104     @Override
105     public void reset(ResetFrame frame, Callback callback)
106     {
107         if (isReset())
108             return;
109         notIdle();
110         localReset = true;
111         session.frames(this, callback, frame, Frame.EMPTY_ARRAY);
112     }
113 
114     @Override
115     public Object getAttribute(String key)
116     {
117         return attributes().get(key);
118     }
119 
120     @Override
121     public void setAttribute(String key, Object value)
122     {
123         attributes().put(key, value);
124     }
125 
126     @Override
127     public Object removeAttribute(String key)
128     {
129         return attributes().remove(key);
130     }
131 
132     @Override
133     public boolean isReset()
134     {
135         return localReset || remoteReset;
136     }
137 
138     @Override
139     public boolean isClosed()
140     {
141         return closeState.get() == CloseState.CLOSED;
142     }
143 
144     public boolean isRemotelyClosed()
145     {
146         return closeState.get() == CloseState.REMOTELY_CLOSED;
147     }
148 
149     public boolean isLocallyClosed()
150     {
151         return closeState.get() == CloseState.LOCALLY_CLOSED;
152     }
153 
154     @Override
155     public boolean isOpen()
156     {
157         return !isClosed();
158     }
159 
160     @Override
161     protected void onIdleExpired(TimeoutException timeout)
162     {
163         if (LOG.isDebugEnabled())
164             LOG.debug("Idle timeout {}ms expired on {}", getIdleTimeout(), this);
165 
166         // The stream is now gone, we must close it to
167         // avoid that its idle timeout is rescheduled.
168         close();
169 
170         // Tell the other peer that we timed out.
171         reset(new ResetFrame(getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
172 
173         // Notify the application.
174         notifyTimeout(this, timeout);
175     }
176 
177     private ConcurrentMap<String, Object> attributes()
178     {
179         ConcurrentMap<String, Object> map = attributes.get();
180         if (map == null)
181         {
182             map = new ConcurrentHashMap<>();
183             if (!attributes.compareAndSet(null, map))
184             {
185                 map = attributes.get();
186             }
187         }
188         return map;
189     }
190 
191     @Override
192     public Listener getListener()
193     {
194         return listener;
195     }
196 
197     @Override
198     public void setListener(Listener listener)
199     {
200         this.listener = listener;
201     }
202 
203     @Override
204     public void process(Frame frame, Callback callback)
205     {
206         notIdle();
207         switch (frame.getType())
208         {
209             case HEADERS:
210             {
211                 onHeaders((HeadersFrame)frame, callback);
212                 break;
213             }
214             case DATA:
215             {
216                 onData((DataFrame)frame, callback);
217                 break;
218             }
219             case RST_STREAM:
220             {
221                 onReset((ResetFrame)frame, callback);
222                 break;
223             }
224             case PUSH_PROMISE:
225             {
226                 onPush((PushPromiseFrame)frame, callback);
227                 break;
228             }
229             default:
230             {
231                 throw new UnsupportedOperationException();
232             }
233         }
234     }
235 
236     private void onHeaders(HeadersFrame frame, Callback callback)
237     {
238         if (updateClose(frame.isEndStream(), false))
239             session.removeStream(this);
240         callback.succeeded();
241     }
242 
243     private void onData(DataFrame frame, Callback callback)
244     {
245         if (getRecvWindow() < 0)
246         {
247             // It's a bad client, it does not deserve to be
248             // treated gently by just resetting the stream.
249             session.close(ErrorCode.FLOW_CONTROL_ERROR.code, "stream_window_exceeded", Callback.NOOP);
250             callback.failed(new IOException("stream_window_exceeded"));
251             return;
252         }
253 
254         // SPEC: remotely closed streams must be replied with a reset.
255         if (isRemotelyClosed())
256         {
257             reset(new ResetFrame(streamId, ErrorCode.STREAM_CLOSED_ERROR.code), Callback.NOOP);
258             callback.failed(new EOFException("stream_closed"));
259             return;
260         }
261 
262         if (isReset())
263         {
264             // Just drop the frame.
265             callback.failed(new IOException("stream_reset"));
266             return;
267         }
268 
269         if (updateClose(frame.isEndStream(), false))
270             session.removeStream(this);
271         notifyData(this, frame, callback);
272     }
273 
274     private void onReset(ResetFrame frame, Callback callback)
275     {
276         remoteReset = true;
277         close();
278         session.removeStream(this);
279         callback.succeeded();
280         notifyReset(this, frame);
281     }
282 
283     private void onPush(PushPromiseFrame frame, Callback callback)
284     {
285         // Pushed streams are implicitly locally closed.
286         // They are closed when receiving an end-stream DATA frame.
287         updateClose(true, true);
288         callback.succeeded();
289     }
290 
291     @Override
292     public boolean updateClose(boolean update, boolean local)
293     {
294         if (LOG.isDebugEnabled())
295             LOG.debug("Update close for {} close={} local={}", this, update, local);
296 
297         if (!update)
298             return false;
299 
300         while (true)
301         {
302             CloseState current = closeState.get();
303             switch (current)
304             {
305                 case NOT_CLOSED:
306                 {
307                     CloseState newValue = local ? CloseState.LOCALLY_CLOSED : CloseState.REMOTELY_CLOSED;
308                     if (closeState.compareAndSet(current, newValue))
309                         return false;
310                     break;
311                 }
312                 case LOCALLY_CLOSED:
313                 {
314                     if (local)
315                         return false;
316                     close();
317                     return true;
318                 }
319                 case REMOTELY_CLOSED:
320                 {
321                     if (!local)
322                         return false;
323                     close();
324                     return true;
325                 }
326                 default:
327                 {
328                     return false;
329                 }
330             }
331         }
332     }
333 
334     public int getSendWindow()
335     {
336         return sendWindow.get();
337     }
338 
339     public int getRecvWindow()
340     {
341         return recvWindow.get();
342     }
343 
344     @Override
345     public int updateSendWindow(int delta)
346     {
347         return sendWindow.getAndAdd(delta);
348     }
349 
350     @Override
351     public int updateRecvWindow(int delta)
352     {
353         return recvWindow.getAndAdd(delta);
354     }
355 
356     @Override
357     public void close()
358     {
359         closeState.set(CloseState.CLOSED);
360         onClose();
361     }
362 
363     private void notifyData(Stream stream, DataFrame frame, Callback callback)
364     {
365         final Listener listener = this.listener;
366         if (listener == null)
367             return;
368         try
369         {
370             listener.onData(stream, frame, callback);
371         }
372         catch (Throwable x)
373         {
374             LOG.info("Failure while notifying listener " + listener, x);
375         }
376     }
377 
378     private void notifyReset(Stream stream, ResetFrame frame)
379     {
380         final Listener listener = this.listener;
381         if (listener == null)
382             return;
383         try
384         {
385             listener.onReset(stream, frame);
386         }
387         catch (Throwable x)
388         {
389             LOG.info("Failure while notifying listener " + listener, x);
390         }
391     }
392 
393     private void notifyTimeout(Stream stream, Throwable failure)
394     {
395         Listener listener = this.listener;
396         if (listener == null)
397             return;
398         try
399         {
400             listener.onTimeout(stream, failure);
401         }
402         catch (Throwable x)
403         {
404             LOG.info("Failure while notifying listener " + listener, x);
405         }
406     }
407 
408     @Override
409     public String toString()
410     {
411         return String.format("%s@%x#%d{sendWindow=%s,recvWindow=%s,reset=%b,%s}", getClass().getSimpleName(),
412                 hashCode(), getId(), sendWindow, recvWindow, isReset(), closeState);
413     }
414 }