View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.spdy;
20  
21  import java.util.Collections;
22  import java.util.Map;
23  import java.util.Set;
24  import java.util.concurrent.ConcurrentHashMap;
25  import java.util.concurrent.Future;
26  import java.util.concurrent.TimeUnit;
27  import java.util.concurrent.atomic.AtomicInteger;
28  
29  import org.eclipse.jetty.spdy.api.DataInfo;
30  import org.eclipse.jetty.spdy.api.Handler;
31  import org.eclipse.jetty.spdy.api.HeadersInfo;
32  import org.eclipse.jetty.spdy.api.ReplyInfo;
33  import org.eclipse.jetty.spdy.api.RstInfo;
34  import org.eclipse.jetty.spdy.api.Stream;
35  import org.eclipse.jetty.spdy.api.StreamFrameListener;
36  import org.eclipse.jetty.spdy.api.StreamStatus;
37  import org.eclipse.jetty.spdy.api.SynInfo;
38  import org.eclipse.jetty.spdy.frames.ControlFrame;
39  import org.eclipse.jetty.spdy.frames.HeadersFrame;
40  import org.eclipse.jetty.spdy.frames.SynReplyFrame;
41  import org.eclipse.jetty.util.log.Log;
42  import org.eclipse.jetty.util.log.Logger;
43  
44  public class StandardStream implements IStream
45  {
46      private static final Logger logger = Log.getLogger(Stream.class);
47      private final Map<String, Object> attributes = new ConcurrentHashMap<>();
48      private final int id;
49      private final byte priority;
50      private final ISession session;
51      private final IStream associatedStream;
52      private final AtomicInteger windowSize = new AtomicInteger();
53      private final Set<Stream> pushedStreams = Collections.newSetFromMap(new ConcurrentHashMap<Stream, Boolean>());
54      private volatile StreamFrameListener listener;
55      private volatile OpenState openState = OpenState.SYN_SENT;
56      private volatile CloseState closeState = CloseState.OPENED;
57      private volatile boolean reset = false;
58  
59      public StandardStream(int id, byte priority, ISession session, IStream associatedStream)
60      {
61          this.id = id;
62          this.priority = priority;
63          this.session = session;
64          this.associatedStream = associatedStream;
65      }
66  
67      @Override
68      public int getId()
69      {
70          return id;
71      }
72  
73      @Override
74      public IStream getAssociatedStream()
75      {
76          return associatedStream;
77      }
78  
79      @Override
80      public Set<Stream> getPushedStreams()
81      {
82          return pushedStreams;
83      }
84  
85      @Override
86      public void associate(IStream stream)
87      {
88          pushedStreams.add(stream);
89      }
90  
91      @Override
92      public void disassociate(IStream stream)
93      {
94          pushedStreams.remove(stream);
95      }
96  
97      @Override
98      public byte getPriority()
99      {
100         return priority;
101     }
102 
103     @Override
104     public int getWindowSize()
105     {
106         return windowSize.get();
107     }
108 
109     @Override
110     public void updateWindowSize(int delta)
111     {
112         int size = windowSize.addAndGet(delta);
113         logger.debug("Updated window size {} -> {} for {}", size - delta, size, this);
114     }
115 
116     @Override
117     public ISession getSession()
118     {
119         return session;
120     }
121 
122     @Override
123     public Object getAttribute(String key)
124     {
125         return attributes.get(key);
126     }
127 
128     @Override
129     public void setAttribute(String key, Object value)
130     {
131         attributes.put(key,value);
132     }
133 
134     @Override
135     public Object removeAttribute(String key)
136     {
137         return attributes.remove(key);
138     }
139 
140     @Override
141     public void setStreamFrameListener(StreamFrameListener listener)
142     {
143         this.listener = listener;
144     }
145 
146     public StreamFrameListener getStreamFrameListener()
147     {
148         return listener;
149     }
150 
151     @Override
152     public void updateCloseState(boolean close, boolean local)
153     {
154         if (close)
155         {
156             switch (closeState)
157             {
158                 case OPENED:
159                 {
160                     closeState = local ? CloseState.LOCALLY_CLOSED : CloseState.REMOTELY_CLOSED;
161                     break;
162                 }
163                 case LOCALLY_CLOSED:
164                 {
165                     if (local)
166                         throw new IllegalStateException();
167                     else
168                         closeState = CloseState.CLOSED;
169                     break;
170                 }
171                 case REMOTELY_CLOSED:
172                 {
173                     if (local)
174                         closeState = CloseState.CLOSED;
175                     else
176                         throw new IllegalStateException();
177                     break;
178                 }
179                 default:
180                 {
181                     throw new IllegalStateException();
182                 }
183             }
184         }
185     }
186 
187     @Override
188     public void process(ControlFrame frame)
189     {
190         switch (frame.getType())
191         {
192             case SYN_STREAM:
193             {
194                 openState = OpenState.SYN_RECV;
195                 break;
196             }
197             case SYN_REPLY:
198             {
199                 openState = OpenState.REPLY_RECV;
200                 SynReplyFrame synReply = (SynReplyFrame)frame;
201                 updateCloseState(synReply.isClose(), false);
202                 ReplyInfo replyInfo = new ReplyInfo(synReply.getHeaders(), synReply.isClose());
203                 notifyOnReply(replyInfo);
204                 break;
205             }
206             case HEADERS:
207             {
208                 HeadersFrame headers = (HeadersFrame)frame;
209                 updateCloseState(headers.isClose(), false);
210                 HeadersInfo headersInfo = new HeadersInfo(headers.getHeaders(), headers.isClose(), headers.isResetCompression());
211                 notifyOnHeaders(headersInfo);
212                 break;
213             }
214             case RST_STREAM:
215             {
216                 reset = true;
217                 break;
218             }
219             default:
220             {
221                 throw new IllegalStateException();
222             }
223         }
224         session.flush();
225     }
226 
227     @Override
228     public void process(DataInfo dataInfo)
229     {
230         // TODO: in v3 we need to send a rst instead of just ignoring
231         // ignore data frame if this stream is remotelyClosed already
232         if (isRemotelyClosed())
233         {
234             logger.debug("Stream is remotely closed, ignoring {}", dataInfo);
235             return;
236         }
237 
238         if (!canReceive())
239         {
240             logger.debug("Protocol error receiving {}, resetting" + dataInfo);
241             session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
242             return;
243         }
244 
245         updateCloseState(dataInfo.isClose(), false);
246         notifyOnData(dataInfo);
247         session.flush();
248     }
249 
250     private void notifyOnReply(ReplyInfo replyInfo)
251     {
252         final StreamFrameListener listener = this.listener;
253         try
254         {
255             if (listener != null)
256             {
257                 logger.debug("Invoking reply callback with {} on listener {}", replyInfo, listener);
258                 listener.onReply(this, replyInfo);
259             }
260         }
261         catch (Exception x)
262         {
263             logger.info("Exception while notifying listener " + listener, x);
264         }
265         catch (Error x)
266         {
267             logger.info("Exception while notifying listener " + listener, x);
268             throw x;
269         }
270     }
271 
272     private void notifyOnHeaders(HeadersInfo headersInfo)
273     {
274         final StreamFrameListener listener = this.listener;
275         try
276         {
277             if (listener != null)
278             {
279                 logger.debug("Invoking headers callback with {} on listener {}", headersInfo, listener);
280                 listener.onHeaders(this, headersInfo);
281             }
282         }
283         catch (Exception x)
284         {
285             logger.info("Exception while notifying listener " + listener, x);
286         }
287         catch (Error x)
288         {
289             logger.info("Exception while notifying listener " + listener, x);
290             throw x;
291         }
292     }
293 
294     private void notifyOnData(DataInfo dataInfo)
295     {
296         final StreamFrameListener listener = this.listener;
297         try
298         {
299             if (listener != null)
300             {
301                 logger.debug("Invoking data callback with {} on listener {}", dataInfo, listener);
302                 listener.onData(this, dataInfo);
303                 logger.debug("Invoked data callback with {} on listener {}", dataInfo, listener);
304             }
305         }
306         catch (Exception x)
307         {
308             logger.info("Exception while notifying listener " + listener, x);
309         }
310         catch (Error x)
311         {
312             logger.info("Exception while notifying listener " + listener, x);
313             throw x;
314         }
315     }
316 
317     @Override
318     public Future<Stream> syn(SynInfo synInfo)
319     {
320         Promise<Stream> result = new Promise<>();
321         syn(synInfo,0,TimeUnit.MILLISECONDS,result);
322         return result;
323     }
324 
325     @Override
326     public void syn(SynInfo synInfo, long timeout, TimeUnit unit, Handler<Stream> handler)
327     {
328         if (isClosed() || isReset())
329         {
330             handler.failed(this, new StreamException(getId(), StreamStatus.STREAM_ALREADY_CLOSED));
331             return;
332         }
333         PushSynInfo pushSynInfo = new PushSynInfo(getId(), synInfo);
334         session.syn(pushSynInfo, null, timeout, unit, handler);
335     }
336 
337     @Override
338     public Future<Void> reply(ReplyInfo replyInfo)
339     {
340         Promise<Void> result = new Promise<>();
341         reply(replyInfo,0,TimeUnit.MILLISECONDS,result);
342         return result;
343     }
344 
345     @Override
346     public void reply(ReplyInfo replyInfo, long timeout, TimeUnit unit, Handler<Void> handler)
347     {
348         if (isUnidirectional())
349             throw new IllegalStateException("Protocol violation: cannot send SYN_REPLY frames in unidirectional streams");
350         openState = OpenState.REPLY_SENT;
351         updateCloseState(replyInfo.isClose(), true);
352         SynReplyFrame frame = new SynReplyFrame(session.getVersion(), replyInfo.getFlags(), getId(), replyInfo.getHeaders());
353         session.control(this, frame, timeout, unit, handler, null);
354     }
355 
356     @Override
357     public Future<Void> data(DataInfo dataInfo)
358     {
359         Promise<Void> result = new Promise<>();
360         data(dataInfo,0,TimeUnit.MILLISECONDS,result);
361         return result;
362     }
363 
364     @Override
365     public void data(DataInfo dataInfo, long timeout, TimeUnit unit, Handler<Void> handler)
366     {
367         if (!canSend())
368         {
369             session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
370             throw new IllegalStateException("Protocol violation: cannot send a DATA frame before a SYN_REPLY frame");
371         }
372         if (isLocallyClosed())
373         {
374             session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
375             throw new IllegalStateException("Protocol violation: cannot send a DATA frame on a closed stream");
376         }
377 
378         // Cannot update the close state here, because the data that we send may
379         // be flow controlled, so we need the stream to update the window size.
380         session.data(this, dataInfo, timeout, unit, handler, null);
381     }
382 
383     @Override
384     public Future<Void> headers(HeadersInfo headersInfo)
385     {
386         Promise<Void> result = new Promise<>();
387         headers(headersInfo,0,TimeUnit.MILLISECONDS,result);
388         return result;
389     }
390 
391     @Override
392     public void headers(HeadersInfo headersInfo, long timeout, TimeUnit unit, Handler<Void> handler)
393     {
394         if (!canSend())
395         {
396             session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
397             throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame before a SYN_REPLY frame");
398         }
399         if (isLocallyClosed())
400         {
401             session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR));
402             throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame on a closed stream");
403         }
404 
405         updateCloseState(headersInfo.isClose(), true);
406         HeadersFrame frame = new HeadersFrame(session.getVersion(), headersInfo.getFlags(), getId(), headersInfo.getHeaders());
407         session.control(this, frame, timeout, unit, handler, null);
408     }
409 
410     @Override
411     public boolean isUnidirectional()
412     {
413         return associatedStream != null;
414     }
415 
416     @Override
417     public boolean isReset()
418     {
419         return reset;
420     }
421 
422     @Override
423     public boolean isHalfClosed()
424     {
425         CloseState closeState = this.closeState;
426         return closeState == CloseState.LOCALLY_CLOSED || closeState == CloseState.REMOTELY_CLOSED || closeState == CloseState.CLOSED;
427     }
428 
429     @Override
430     public boolean isClosed()
431     {
432         return closeState == CloseState.CLOSED;
433     }
434 
435     private boolean isLocallyClosed()
436     {
437         CloseState closeState = this.closeState;
438         return closeState == CloseState.LOCALLY_CLOSED || closeState == CloseState.CLOSED;
439     }
440 
441     private boolean isRemotelyClosed()
442     {
443         CloseState closeState = this.closeState;
444         return closeState == CloseState.REMOTELY_CLOSED || closeState == CloseState.CLOSED;
445     }
446 
447     @Override
448     public String toString()
449     {
450         return String.format("stream=%d v%d windowSize=%db reset=%s %s %s", getId(), session.getVersion(), getWindowSize(), isReset(), openState, closeState);
451     }
452 
453     private boolean canSend()
454     {
455         OpenState openState = this.openState;
456         return openState == OpenState.SYN_SENT || openState == OpenState.REPLY_RECV || openState == OpenState.REPLY_SENT;
457     }
458 
459     private boolean canReceive()
460     {
461         OpenState openState = this.openState;
462         return openState == OpenState.SYN_RECV || openState == OpenState.REPLY_RECV || openState == OpenState.REPLY_SENT;
463     }
464 
465     private enum OpenState
466     {
467         SYN_SENT, SYN_RECV, REPLY_SENT, REPLY_RECV
468     }
469 
470     private enum CloseState
471     {
472         OPENED, LOCALLY_CLOSED, REMOTELY_CLOSED, CLOSED
473     }
474 }