View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.http2;
20  
21  import java.io.EOFException;
22  import java.io.IOException;
23  import java.util.concurrent.ConcurrentHashMap;
24  import java.util.concurrent.ConcurrentMap;
25  import java.util.concurrent.TimeoutException;
26  import java.util.concurrent.atomic.AtomicInteger;
27  import java.util.concurrent.atomic.AtomicReference;
28  
29  import org.eclipse.jetty.http2.api.Stream;
30  import org.eclipse.jetty.http2.frames.DataFrame;
31  import org.eclipse.jetty.http2.frames.Frame;
32  import org.eclipse.jetty.http2.frames.HeadersFrame;
33  import org.eclipse.jetty.http2.frames.PushPromiseFrame;
34  import org.eclipse.jetty.http2.frames.ResetFrame;
35  import org.eclipse.jetty.io.IdleTimeout;
36  import org.eclipse.jetty.util.Callback;
37  import org.eclipse.jetty.util.Promise;
38  import org.eclipse.jetty.util.log.Log;
39  import org.eclipse.jetty.util.log.Logger;
40  import org.eclipse.jetty.util.thread.Scheduler;
41  
42  public class HTTP2Stream extends IdleTimeout implements IStream
43  {
44      private static final Logger LOG = Log.getLogger(HTTP2Stream.class);
45  
46      private final AtomicReference<ConcurrentMap<String, Object>> attributes = new AtomicReference<>();
47      private final AtomicReference<CloseState> closeState = new AtomicReference<>(CloseState.NOT_CLOSED);
48      private final AtomicInteger sendWindow = new AtomicInteger();
49      private final AtomicInteger recvWindow = new AtomicInteger();
50      private final ISession session;
51      private final int streamId;
52      private volatile Listener listener;
53      private volatile boolean localReset;
54      private volatile boolean remoteReset;
55  
56      public HTTP2Stream(Scheduler scheduler, ISession session, int streamId)
57      {
58          super(scheduler);
59          this.session = session;
60          this.streamId = streamId;
61      }
62  
63      @Override
64      public int getId()
65      {
66          return streamId;
67      }
68  
69      @Override
70      public ISession getSession()
71      {
72          return session;
73      }
74  
75      @Override
76      public void headers(HeadersFrame frame, Callback callback)
77      {
78          notIdle();
79          session.frames(this, callback, frame, Frame.EMPTY_ARRAY);
80      }
81  
82      @Override
83      public void push(PushPromiseFrame frame, Promise<Stream> promise, Listener listener)
84      {
85          notIdle();
86          session.push(this, promise, frame, listener);
87      }
88  
89      @Override
90      public void data(DataFrame frame, Callback callback)
91      {
92          notIdle();
93          session.data(this, callback, frame);
94      }
95  
96      @Override
97      public void reset(ResetFrame frame, Callback callback)
98      {
99          if (isReset())
100             return;
101         notIdle();
102         localReset = true;
103         session.frames(this, callback, frame, Frame.EMPTY_ARRAY);
104     }
105 
106     @Override
107     public Object getAttribute(String key)
108     {
109         return attributes().get(key);
110     }
111 
112     @Override
113     public void setAttribute(String key, Object value)
114     {
115         attributes().put(key, value);
116     }
117 
118     @Override
119     public Object removeAttribute(String key)
120     {
121         return attributes().remove(key);
122     }
123 
124     @Override
125     public boolean isReset()
126     {
127         return localReset || remoteReset;
128     }
129 
130     @Override
131     public boolean isClosed()
132     {
133         return closeState.get() == CloseState.CLOSED;
134     }
135 
136     public boolean isRemotelyClosed()
137     {
138         return closeState.get() == CloseState.REMOTELY_CLOSED;
139     }
140 
141     public boolean isLocallyClosed()
142     {
143         return closeState.get() == CloseState.LOCALLY_CLOSED;
144     }
145 
146     @Override
147     public boolean isOpen()
148     {
149         return !isClosed();
150     }
151 
152     @Override
153     protected void onIdleExpired(TimeoutException timeout)
154     {
155         if (LOG.isDebugEnabled())
156             LOG.debug("Idle timeout {}ms expired on {}", getIdleTimeout(), this);
157 
158         // The stream is now gone, we must close it to
159         // avoid that its idle timeout is rescheduled.
160         close();
161 
162         // Tell the other peer that we timed out.
163         reset(new ResetFrame(getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
164 
165         // Notify the application.
166         notifyTimeout(this, timeout);
167     }
168 
169     private ConcurrentMap<String, Object> attributes()
170     {
171         ConcurrentMap<String, Object> map = attributes.get();
172         if (map == null)
173         {
174             map = new ConcurrentHashMap<>();
175             if (!attributes.compareAndSet(null, map))
176             {
177                 map = attributes.get();
178             }
179         }
180         return map;
181     }
182 
183     @Override
184     public Listener getListener()
185     {
186         return listener;
187     }
188 
189     @Override
190     public void setListener(Listener listener)
191     {
192         this.listener = listener;
193     }
194 
195     @Override
196     public void process(Frame frame, Callback callback)
197     {
198         notIdle();
199         switch (frame.getType())
200         {
201             case HEADERS:
202             {
203                 onHeaders((HeadersFrame)frame, callback);
204                 break;
205             }
206             case DATA:
207             {
208                 onData((DataFrame)frame, callback);
209                 break;
210             }
211             case RST_STREAM:
212             {
213                 onReset((ResetFrame)frame, callback);
214                 break;
215             }
216             case PUSH_PROMISE:
217             {
218                 onPush((PushPromiseFrame)frame, callback);
219                 break;
220             }
221             default:
222             {
223                 throw new UnsupportedOperationException();
224             }
225         }
226     }
227 
228     private void onHeaders(HeadersFrame frame, Callback callback)
229     {
230         if (updateClose(frame.isEndStream(), false))
231             session.removeStream(this, false);
232         callback.succeeded();
233     }
234 
235     private void onData(DataFrame frame, Callback callback)
236     {
237         if (getRecvWindow() < 0)
238         {
239             // It's a bad client, it does not deserve to be
240             // treated gently by just resetting the stream.
241             session.close(ErrorCode.FLOW_CONTROL_ERROR.code, "stream_window_exceeded", Callback.NOOP);
242             callback.failed(new IOException("stream_window_exceeded"));
243             return;
244         }
245 
246         // SPEC: remotely closed streams must be replied with a reset.
247         if (isRemotelyClosed())
248         {
249             reset(new ResetFrame(streamId, ErrorCode.STREAM_CLOSED_ERROR.code), Callback.NOOP);
250             callback.failed(new EOFException("stream_closed"));
251             return;
252         }
253 
254         if (isReset())
255         {
256             // Just drop the frame.
257             callback.failed(new IOException("stream_reset"));
258             return;
259         }
260 
261         if (updateClose(frame.isEndStream(), false))
262             session.removeStream(this, false);
263         notifyData(this, frame, callback);
264     }
265 
266     private void onReset(ResetFrame frame, Callback callback)
267     {
268         remoteReset = true;
269         close();
270         session.removeStream(this, false);
271         callback.succeeded();
272         notifyReset(this, frame);
273     }
274 
275     private void onPush(PushPromiseFrame frame, Callback callback)
276     {
277         // Pushed streams are implicitly locally closed.
278         // They are closed when receiving an end-stream DATA frame.
279         updateClose(true, true);
280         callback.succeeded();
281     }
282 
283     @Override
284     public boolean updateClose(boolean update, boolean local)
285     {
286         if (LOG.isDebugEnabled())
287             LOG.debug("Update close for {} close={} local={}", this, update, local);
288 
289         if (!update)
290             return false;
291 
292         while (true)
293         {
294             CloseState current = closeState.get();
295             switch (current)
296             {
297                 case NOT_CLOSED:
298                 {
299                     CloseState newValue = local ? CloseState.LOCALLY_CLOSED : CloseState.REMOTELY_CLOSED;
300                     if (closeState.compareAndSet(current, newValue))
301                         return false;
302                     break;
303                 }
304                 case LOCALLY_CLOSED:
305                 {
306                     if (local)
307                         return false;
308                     close();
309                     return true;
310                 }
311                 case REMOTELY_CLOSED:
312                 {
313                     if (!local)
314                         return false;
315                     close();
316                     return true;
317                 }
318                 default:
319                 {
320                     return false;
321                 }
322             }
323         }
324     }
325 
326     public int getSendWindow()
327     {
328         return sendWindow.get();
329     }
330 
331     public int getRecvWindow()
332     {
333         return recvWindow.get();
334     }
335 
336     @Override
337     public int updateSendWindow(int delta)
338     {
339         return sendWindow.getAndAdd(delta);
340     }
341 
342     @Override
343     public int updateRecvWindow(int delta)
344     {
345         return recvWindow.getAndAdd(delta);
346     }
347 
348     @Override
349     public void close()
350     {
351         closeState.set(CloseState.CLOSED);
352         onClose();
353     }
354 
355     private void notifyData(Stream stream, DataFrame frame, Callback callback)
356     {
357         final Listener listener = this.listener;
358         if (listener == null)
359             return;
360         try
361         {
362             listener.onData(stream, frame, callback);
363         }
364         catch (Throwable x)
365         {
366             LOG.info("Failure while notifying listener " + listener, x);
367         }
368     }
369 
370     private void notifyReset(Stream stream, ResetFrame frame)
371     {
372         final Listener listener = this.listener;
373         if (listener == null)
374             return;
375         try
376         {
377             listener.onReset(stream, frame);
378         }
379         catch (Throwable x)
380         {
381             LOG.info("Failure while notifying listener " + listener, x);
382         }
383     }
384 
385     private void notifyTimeout(Stream stream, Throwable failure)
386     {
387         Listener listener = this.listener;
388         if (listener == null)
389             return;
390         try
391         {
392             listener.onTimeout(stream, failure);
393         }
394         catch (Throwable x)
395         {
396             LOG.info("Failure while notifying listener " + listener, x);
397         }
398     }
399 
400     @Override
401     public String toString()
402     {
403         return String.format("%s@%x#%d{sendWindow=%s,recvWindow=%s,reset=%b,%s}", getClass().getSimpleName(),
404                 hashCode(), getId(), sendWindow, recvWindow, isReset(), closeState);
405     }
406 }