View Javadoc

1   //========================================================================
2   //Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
3   //------------------------------------------------------------------------
4   //All rights reserved. This program and the accompanying materials
5   //are made available under the terms of the Eclipse Public License v1.0
6   //and Apache License v2.0 which accompanies this distribution.
7   //The Eclipse Public License is available at
8   //http://www.eclipse.org/legal/epl-v10.html
9   //The Apache License v2.0 is available at
10  //http://www.opensource.org/licenses/apache2.0.php
11  //You may elect to redistribute this code under either of these licenses.
12  //========================================================================
13  
14  package org.eclipse.jetty.spdy;
15  
16  import java.util.Collections;
17  import java.util.Map;
18  import java.util.Set;
19  import java.util.concurrent.ConcurrentHashMap;
20  import java.util.concurrent.Future;
21  import java.util.concurrent.TimeUnit;
22  import java.util.concurrent.atomic.AtomicInteger;
23  
24  import org.eclipse.jetty.spdy.api.DataInfo;
25  import org.eclipse.jetty.spdy.api.Handler;
26  import org.eclipse.jetty.spdy.api.HeadersInfo;
27  import org.eclipse.jetty.spdy.api.ReplyInfo;
28  import org.eclipse.jetty.spdy.api.RstInfo;
29  import org.eclipse.jetty.spdy.api.Stream;
30  import org.eclipse.jetty.spdy.api.StreamFrameListener;
31  import org.eclipse.jetty.spdy.api.StreamStatus;
32  import org.eclipse.jetty.spdy.api.SynInfo;
33  import org.eclipse.jetty.spdy.frames.ControlFrame;
34  import org.eclipse.jetty.spdy.frames.HeadersFrame;
35  import org.eclipse.jetty.spdy.frames.SynReplyFrame;
36  import org.eclipse.jetty.util.log.Log;
37  import org.eclipse.jetty.util.log.Logger;
38  
39  public class StandardStream implements IStream
40  {
41      private static final Logger logger = Log.getLogger(Stream.class);
42      private final Map<String, Object> attributes = new ConcurrentHashMap<>();
43      private final int id;
44      private final byte priority;
45      private final ISession session;
46      private final IStream associatedStream;
47      private final AtomicInteger windowSize = new AtomicInteger();
48      private final Set<Stream> pushedStreams = Collections.newSetFromMap(new ConcurrentHashMap<Stream, Boolean>());
49      private volatile StreamFrameListener listener;
50      private volatile OpenState openState = OpenState.SYN_SENT;
51      private volatile CloseState closeState = CloseState.OPENED;
52      private volatile boolean reset = false;
53  
54      public StandardStream(int id, byte priority, ISession session, IStream associatedStream)
55      {
56          this.id = id;
57          this.priority = priority;
58          this.session = session;
59          this.associatedStream = associatedStream;
60      }
61  
62      @Override
63      public int getId()
64      {
65          return id;
66      }
67  
68      @Override
69      public IStream getAssociatedStream()
70      {
71          return associatedStream;
72      }
73  
74      @Override
75      public Set<Stream> getPushedStreams()
76      {
77          return pushedStreams;
78      }
79  
80      @Override
81      public void associate(IStream stream)
82      {
83          pushedStreams.add(stream);
84      }
85  
86      @Override
87      public void disassociate(IStream stream)
88      {
89          pushedStreams.remove(stream);
90      }
91  
92      @Override
93      public byte getPriority()
94      {
95          return priority;
96      }
97  
98      @Override
99      public int getWindowSize()
100     {
101         return windowSize.get();
102     }
103 
104     @Override
105     public void updateWindowSize(int delta)
106     {
107         int size = windowSize.addAndGet(delta);
108         logger.debug("Updated window size {} -> {} for {}", size - delta, size, this);
109     }
110 
111     @Override
112     public ISession getSession()
113     {
114         return session;
115     }
116 
117     @Override
118     public Object getAttribute(String key)
119     {
120         return attributes.get(key);
121     }
122 
123     @Override
124     public void setAttribute(String key, Object value)
125     {
126         attributes.put(key,value);
127     }
128 
129     @Override
130     public Object removeAttribute(String key)
131     {
132         return attributes.remove(key);
133     }
134 
135     @Override
136     public void setStreamFrameListener(StreamFrameListener listener)
137     {
138         this.listener = listener;
139     }
140 
141     public StreamFrameListener getStreamFrameListener()
142     {
143         return listener;
144     }
145 
146     @Override
147     public void updateCloseState(boolean close, boolean local)
148     {
149         if (close)
150         {
151             switch (closeState)
152             {
153                 case OPENED:
154                 {
155                     closeState = local ? CloseState.LOCALLY_CLOSED : CloseState.REMOTELY_CLOSED;
156                     break;
157                 }
158                 case LOCALLY_CLOSED:
159                 {
160                     if (local)
161                         throw new IllegalStateException();
162                     else
163                         closeState = CloseState.CLOSED;
164                     break;
165                 }
166                 case REMOTELY_CLOSED:
167                 {
168                     if (local)
169                         closeState = CloseState.CLOSED;
170                     else
171                         throw new IllegalStateException();
172                     break;
173                 }
174                 default:
175                 {
176                     throw new IllegalStateException();
177                 }
178             }
179         }
180     }
181 
182     @Override
183     public void process(ControlFrame frame)
184     {
185         switch (frame.getType())
186         {
187             case SYN_STREAM:
188             {
189                 openState = OpenState.SYN_RECV;
190                 break;
191             }
192             case SYN_REPLY:
193             {
194                 openState = OpenState.REPLY_RECV;
195                 SynReplyFrame synReply = (SynReplyFrame)frame;
196                 updateCloseState(synReply.isClose(), false);
197                 ReplyInfo replyInfo = new ReplyInfo(synReply.getHeaders(), synReply.isClose());
198                 notifyOnReply(replyInfo);
199                 break;
200             }
201             case HEADERS:
202             {
203                 HeadersFrame headers = (HeadersFrame)frame;
204                 updateCloseState(headers.isClose(), false);
205                 HeadersInfo headersInfo = new HeadersInfo(headers.getHeaders(), headers.isClose(), headers.isResetCompression());
206                 notifyOnHeaders(headersInfo);
207                 break;
208             }
209             case RST_STREAM:
210             {
211                 reset = true;
212                 break;
213             }
214             default:
215             {
216                 throw new IllegalStateException();
217             }
218         }
219         session.flush();
220     }
221 
222     @Override
223     public void process(DataInfo dataInfo)
224     {
225         // TODO: in v3 we need to send a rst instead of just ignoring
226         // ignore data frame if this stream is remotelyClosed already
227         if (isRemotelyClosed())
228         {
229             logger.debug("Stream is remotely closed, ignoring {}", dataInfo);
230             return;
231         }
232 
233         if (!canReceive())
234         {
235             logger.debug("Protocol error receiving {}, resetting" + dataInfo);
236             session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
237             return;
238         }
239 
240         updateCloseState(dataInfo.isClose(), false);
241         notifyOnData(dataInfo);
242         session.flush();
243     }
244 
245     private void notifyOnReply(ReplyInfo replyInfo)
246     {
247         final StreamFrameListener listener = this.listener;
248         try
249         {
250             if (listener != null)
251             {
252                 logger.debug("Invoking reply callback with {} on listener {}", replyInfo, listener);
253                 listener.onReply(this, replyInfo);
254             }
255         }
256         catch (Exception x)
257         {
258             logger.info("Exception while notifying listener " + listener, x);
259         }
260         catch (Error x)
261         {
262             logger.info("Exception while notifying listener " + listener, x);
263             throw x;
264         }
265     }
266 
267     private void notifyOnHeaders(HeadersInfo headersInfo)
268     {
269         final StreamFrameListener listener = this.listener;
270         try
271         {
272             if (listener != null)
273             {
274                 logger.debug("Invoking headers callback with {} on listener {}", headersInfo, listener);
275                 listener.onHeaders(this, headersInfo);
276             }
277         }
278         catch (Exception x)
279         {
280             logger.info("Exception while notifying listener " + listener, x);
281         }
282         catch (Error x)
283         {
284             logger.info("Exception while notifying listener " + listener, x);
285             throw x;
286         }
287     }
288 
289     private void notifyOnData(DataInfo dataInfo)
290     {
291         final StreamFrameListener listener = this.listener;
292         try
293         {
294             if (listener != null)
295             {
296                 logger.debug("Invoking data callback with {} on listener {}", dataInfo, listener);
297                 listener.onData(this, dataInfo);
298                 logger.debug("Invoked data callback with {} on listener {}", dataInfo, listener);
299             }
300         }
301         catch (Exception x)
302         {
303             logger.info("Exception while notifying listener " + listener, x);
304         }
305         catch (Error x)
306         {
307             logger.info("Exception while notifying listener " + listener, x);
308             throw x;
309         }
310     }
311 
312     @Override
313     public Future<Stream> syn(SynInfo synInfo)
314     {
315         Promise<Stream> result = new Promise<>();
316         syn(synInfo,0,TimeUnit.MILLISECONDS,result);
317         return result;
318     }
319 
320     @Override
321     public void syn(SynInfo synInfo, long timeout, TimeUnit unit, Handler<Stream> handler)
322     {
323         if (isClosed() || isReset())
324         {
325             handler.failed(this, new StreamException(getId(), StreamStatus.STREAM_ALREADY_CLOSED));
326             return;
327         }
328         PushSynInfo pushSynInfo = new PushSynInfo(getId(), synInfo);
329         session.syn(pushSynInfo, null, timeout, unit, handler);
330     }
331 
332     @Override
333     public Future<Void> reply(ReplyInfo replyInfo)
334     {
335         Promise<Void> result = new Promise<>();
336         reply(replyInfo,0,TimeUnit.MILLISECONDS,result);
337         return result;
338     }
339 
340     @Override
341     public void reply(ReplyInfo replyInfo, long timeout, TimeUnit unit, Handler<Void> handler)
342     {
343         if (isUnidirectional())
344             throw new IllegalStateException("Protocol violation: cannot send SYN_REPLY frames in unidirectional streams");
345         openState = OpenState.REPLY_SENT;
346         updateCloseState(replyInfo.isClose(), true);
347         SynReplyFrame frame = new SynReplyFrame(session.getVersion(), replyInfo.getFlags(), getId(), replyInfo.getHeaders());
348         session.control(this, frame, timeout, unit, handler, null);
349     }
350 
351     @Override
352     public Future<Void> data(DataInfo dataInfo)
353     {
354         Promise<Void> result = new Promise<>();
355         data(dataInfo,0,TimeUnit.MILLISECONDS,result);
356         return result;
357     }
358 
359     @Override
360     public void data(DataInfo dataInfo, long timeout, TimeUnit unit, Handler<Void> handler)
361     {
362         if (!canSend())
363         {
364             session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
365             throw new IllegalStateException("Protocol violation: cannot send a DATA frame before a SYN_REPLY frame");
366         }
367         if (isLocallyClosed())
368         {
369             session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
370             throw new IllegalStateException("Protocol violation: cannot send a DATA frame on a closed stream");
371         }
372 
373         // Cannot update the close state here, because the data that we send may
374         // be flow controlled, so we need the stream to update the window size.
375         session.data(this, dataInfo, timeout, unit, handler, null);
376     }
377 
378     @Override
379     public Future<Void> headers(HeadersInfo headersInfo)
380     {
381         Promise<Void> result = new Promise<>();
382         headers(headersInfo,0,TimeUnit.MILLISECONDS,result);
383         return result;
384     }
385 
386     @Override
387     public void headers(HeadersInfo headersInfo, long timeout, TimeUnit unit, Handler<Void> handler)
388     {
389         if (!canSend())
390         {
391             session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
392             throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame before a SYN_REPLY frame");
393         }
394         if (isLocallyClosed())
395         {
396             session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
397             throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame on a closed stream");
398         }
399 
400         updateCloseState(headersInfo.isClose(), true);
401         HeadersFrame frame = new HeadersFrame(session.getVersion(), headersInfo.getFlags(), getId(), headersInfo.getHeaders());
402         session.control(this, frame, timeout, unit, handler, null);
403     }
404 
405     @Override
406     public boolean isUnidirectional()
407     {
408         return associatedStream != null;
409     }
410 
411     @Override
412     public boolean isReset()
413     {
414         return reset;
415     }
416 
417     @Override
418     public boolean isHalfClosed()
419     {
420         CloseState closeState = this.closeState;
421         return closeState == CloseState.LOCALLY_CLOSED || closeState == CloseState.REMOTELY_CLOSED || closeState == CloseState.CLOSED;
422     }
423 
424     @Override
425     public boolean isClosed()
426     {
427         return closeState == CloseState.CLOSED;
428     }
429 
430     private boolean isLocallyClosed()
431     {
432         CloseState closeState = this.closeState;
433         return closeState == CloseState.LOCALLY_CLOSED || closeState == CloseState.CLOSED;
434     }
435 
436     private boolean isRemotelyClosed()
437     {
438         CloseState closeState = this.closeState;
439         return closeState == CloseState.REMOTELY_CLOSED || closeState == CloseState.CLOSED;
440     }
441 
442     @Override
443     public String toString()
444     {
445         return String.format("stream=%d v%d windowSize=%db reset=%s %s %s", getId(), session.getVersion(), getWindowSize(), isReset(), openState, closeState);
446     }
447 
448     private boolean canSend()
449     {
450         OpenState openState = this.openState;
451         return openState == OpenState.SYN_SENT || openState == OpenState.REPLY_RECV || openState == OpenState.REPLY_SENT;
452     }
453 
454     private boolean canReceive()
455     {
456         OpenState openState = this.openState;
457         return openState == OpenState.SYN_RECV || openState == OpenState.REPLY_RECV || openState == OpenState.REPLY_SENT;
458     }
459 
460     private enum OpenState
461     {
462         SYN_SENT, SYN_RECV, REPLY_SENT, REPLY_RECV
463     }
464 
465     private enum CloseState
466     {
467         OPENED, LOCALLY_CLOSED, REMOTELY_CLOSED, CLOSED
468     }
469 }