View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.spdy;
20  
21  import java.util.Collections;
22  import java.util.Map;
23  import java.util.Set;
24  import java.util.concurrent.ConcurrentHashMap;
25  import java.util.concurrent.ExecutionException;
26  import java.util.concurrent.TimeoutException;
27  import java.util.concurrent.atomic.AtomicInteger;
28  
29  import org.eclipse.jetty.spdy.api.DataInfo;
30  import org.eclipse.jetty.spdy.api.HeadersInfo;
31  import org.eclipse.jetty.spdy.api.PushInfo;
32  import org.eclipse.jetty.spdy.api.ReplyInfo;
33  import org.eclipse.jetty.spdy.api.RstInfo;
34  import org.eclipse.jetty.spdy.api.Stream;
35  import org.eclipse.jetty.spdy.api.StreamFrameListener;
36  import org.eclipse.jetty.spdy.api.StreamStatus;
37  import org.eclipse.jetty.spdy.frames.ControlFrame;
38  import org.eclipse.jetty.spdy.frames.HeadersFrame;
39  import org.eclipse.jetty.spdy.frames.SynReplyFrame;
40  import org.eclipse.jetty.util.Callback;
41  import org.eclipse.jetty.util.FutureCallback;
42  import org.eclipse.jetty.util.FuturePromise;
43  import org.eclipse.jetty.util.Promise;
44  import org.eclipse.jetty.util.log.Log;
45  import org.eclipse.jetty.util.log.Logger;
46  
47  public class StandardStream implements IStream
48  {
49      private static final Logger LOG = Log.getLogger(Stream.class);
50      private final Map<String, Object> attributes = new ConcurrentHashMap<>();
51      private final int id;
52      private final byte priority;
53      private final ISession session;
54      private final IStream associatedStream;
55      private final Promise<Stream> promise;
56      private final AtomicInteger windowSize = new AtomicInteger();
57      private final Set<Stream> pushedStreams = Collections.newSetFromMap(new ConcurrentHashMap<Stream, Boolean>());
58      private volatile StreamFrameListener listener;
59      private volatile OpenState openState = OpenState.SYN_SENT;
60      private volatile CloseState closeState = CloseState.OPENED;
61      private volatile boolean reset = false;
62  
63      public StandardStream(int id, byte priority, ISession session, IStream associatedStream, Promise<Stream> promise)
64      {
65          this.id = id;
66          this.priority = priority;
67          this.session = session;
68          this.associatedStream = associatedStream;
69          this.promise = promise;
70      }
71  
72      @Override
73      public int getId()
74      {
75          return id;
76      }
77  
78      @Override
79      public IStream getAssociatedStream()
80      {
81          return associatedStream;
82      }
83  
84      @Override
85      public Set<Stream> getPushedStreams()
86      {
87          return pushedStreams;
88      }
89  
90      @Override
91      public void associate(IStream stream)
92      {
93          pushedStreams.add(stream);
94      }
95  
96      @Override
97      public void disassociate(IStream stream)
98      {
99          pushedStreams.remove(stream);
100     }
101 
102     @Override
103     public byte getPriority()
104     {
105         return priority;
106     }
107 
108     @Override
109     public int getWindowSize()
110     {
111         return windowSize.get();
112     }
113 
114     @Override
115     public void updateWindowSize(int delta)
116     {
117         int size = windowSize.addAndGet(delta);
118         LOG.debug("Updated window size {} -> {} for {}", size - delta, size, this);
119     }
120 
121     @Override
122     public ISession getSession()
123     {
124         return session;
125     }
126 
127     @Override
128     public Object getAttribute(String key)
129     {
130         return attributes.get(key);
131     }
132 
133     @Override
134     public void setAttribute(String key, Object value)
135     {
136         attributes.put(key, value);
137     }
138 
139     @Override
140     public Object removeAttribute(String key)
141     {
142         return attributes.remove(key);
143     }
144 
145     @Override
146     public void setStreamFrameListener(StreamFrameListener listener)
147     {
148         this.listener = listener;
149     }
150 
151     @Override
152     public StreamFrameListener getStreamFrameListener()
153     {
154         return listener;
155     }
156 
157     @Override
158     public void updateCloseState(boolean close, boolean local)
159     {
160         LOG.debug("{} close={} local={}", this, close, local);
161         if (close)
162         {
163             switch (closeState)
164             {
165                 case OPENED:
166                 {
167                     closeState = local ? CloseState.LOCALLY_CLOSED : CloseState.REMOTELY_CLOSED;
168                     break;
169                 }
170                 case LOCALLY_CLOSED:
171                 {
172                     if (local)
173                         throw new IllegalStateException();
174                     else
175                         closeState = CloseState.CLOSED;
176                     break;
177                 }
178                 case REMOTELY_CLOSED:
179                 {
180                     if (local)
181                         closeState = CloseState.CLOSED;
182                     else
183                         throw new IllegalStateException();
184                     break;
185                 }
186                 default:
187                 {
188                     LOG.warn("Already CLOSED! {} local={}", this, local);
189                 }
190             }
191         }
192     }
193 
194     @Override
195     public void process(ControlFrame frame)
196     {
197         switch (frame.getType())
198         {
199             case SYN_STREAM:
200             {
201                 openState = OpenState.SYN_RECV;
202                 break;
203             }
204             case SYN_REPLY:
205             {
206                 openState = OpenState.REPLY_RECV;
207                 SynReplyFrame synReply = (SynReplyFrame)frame;
208                 updateCloseState(synReply.isClose(), false);
209                 ReplyInfo replyInfo = new ReplyInfo(synReply.getHeaders(), synReply.isClose());
210                 notifyOnReply(replyInfo);
211                 break;
212             }
213             case HEADERS:
214             {
215                 HeadersFrame headers = (HeadersFrame)frame;
216                 updateCloseState(headers.isClose(), false);
217                 HeadersInfo headersInfo = new HeadersInfo(headers.getHeaders(), headers.isClose(), headers.isResetCompression());
218                 notifyOnHeaders(headersInfo);
219                 break;
220             }
221             case RST_STREAM:
222             {
223                 reset = true;
224                 break;
225             }
226             default:
227             {
228                 throw new IllegalStateException();
229             }
230         }
231         session.flush();
232     }
233 
234     @Override
235     public void process(DataInfo dataInfo)
236     {
237         // TODO: in v3 we need to send a rst instead of just ignoring
238         // ignore data frame if this stream is remotelyClosed already
239         if (isRemotelyClosed())
240         {
241             LOG.debug("Stream is remotely closed, ignoring {}", dataInfo);
242             return;
243         }
244 
245         if (!canReceive())
246         {
247             LOG.debug("Protocol error receiving {}, resetting", dataInfo);
248             session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter());
249             return;
250         }
251 
252         updateCloseState(dataInfo.isClose(), false);
253         notifyOnData(dataInfo);
254         session.flush();
255     }
256 
257     @Override
258     public void succeeded()
259     {
260         if (promise != null)
261             promise.succeeded(this);
262     }
263 
264     @Override
265     public void failed(Throwable x)
266     {
267         if (promise != null)
268             promise.failed(x);
269     }
270 
271     private void notifyOnReply(ReplyInfo replyInfo)
272     {
273         final StreamFrameListener listener = this.listener;
274         try
275         {
276             if (listener != null)
277             {
278                 LOG.debug("Invoking reply callback with {} on listener {}", replyInfo, listener);
279                 listener.onReply(this, replyInfo);
280             }
281         }
282         catch (Exception x)
283         {
284             LOG.info("Exception while notifying listener " + listener, x);
285         }
286         catch (Error x)
287         {
288             LOG.info("Exception while notifying listener " + listener, x);
289             throw x;
290         }
291     }
292 
293     private void notifyOnHeaders(HeadersInfo headersInfo)
294     {
295         final StreamFrameListener listener = this.listener;
296         try
297         {
298             if (listener != null)
299             {
300                 LOG.debug("Invoking headers callback with {} on listener {}", headersInfo, listener);
301                 listener.onHeaders(this, headersInfo);
302             }
303         }
304         catch (Exception x)
305         {
306             LOG.info("Exception while notifying listener " + listener, x);
307         }
308         catch (Error x)
309         {
310             LOG.info("Exception while notifying listener " + listener, x);
311             throw x;
312         }
313     }
314 
315     private void notifyOnData(DataInfo dataInfo)
316     {
317         final StreamFrameListener listener = this.listener;
318         try
319         {
320             if (listener != null)
321             {
322                 LOG.debug("Invoking data callback with {} on listener {}", dataInfo, listener);
323                 listener.onData(this, dataInfo);
324                 LOG.debug("Invoked data callback with {} on listener {}", dataInfo, listener);
325             }
326         }
327         catch (Exception x)
328         {
329             LOG.info("Exception while notifying listener " + listener, x);
330         }
331         catch (Error x)
332         {
333             LOG.info("Exception while notifying listener " + listener, x);
334             throw x;
335         }
336     }
337 
338     @Override
339     public Stream push(PushInfo pushInfo) throws InterruptedException, ExecutionException, TimeoutException
340     {
341         FuturePromise<Stream> result = new FuturePromise<>();
342         push(pushInfo, result);
343         if (pushInfo.getTimeout() > 0)
344             return result.get(pushInfo.getTimeout(), pushInfo.getUnit());
345         else
346             return result.get();
347     }
348 
349     @Override
350     public void push(PushInfo pushInfo, Promise<Stream> promise)
351     {
352         if (isClosed() || isReset())
353         {
354             promise.failed(new StreamException(getId(), StreamStatus.STREAM_ALREADY_CLOSED,
355                     "Stream: " + this + " already closed or reset!"));
356             return;
357         }
358         PushSynInfo pushSynInfo = new PushSynInfo(getId(), pushInfo);
359         session.syn(pushSynInfo, null, promise);
360     }
361 
362     @Override
363     public void reply(ReplyInfo replyInfo) throws InterruptedException, ExecutionException, TimeoutException
364     {
365         FutureCallback result = new FutureCallback();
366         reply(replyInfo, result);
367         if (replyInfo.getTimeout() > 0)
368             result.get(replyInfo.getTimeout(), replyInfo.getUnit());
369         else
370             result.get();
371     }
372 
373     @Override
374     public void reply(ReplyInfo replyInfo, Callback callback)
375     {
376         if (isUnidirectional())
377             throw new IllegalStateException("Protocol violation: cannot send SYN_REPLY frames in unidirectional streams");
378         openState = OpenState.REPLY_SENT;
379         updateCloseState(replyInfo.isClose(), true);
380         SynReplyFrame frame = new SynReplyFrame(session.getVersion(), replyInfo.getFlags(), getId(), replyInfo.getHeaders());
381         session.control(this, frame, replyInfo.getTimeout(), replyInfo.getUnit(), callback);
382     }
383 
384     @Override
385     public void data(DataInfo dataInfo) throws InterruptedException, ExecutionException, TimeoutException
386     {
387         FutureCallback result = new FutureCallback();
388         data(dataInfo, result);
389         if (dataInfo.getTimeout() > 0)
390             result.get(dataInfo.getTimeout(), dataInfo.getUnit());
391         else
392             result.get();
393     }
394 
395     @Override
396     public void data(DataInfo dataInfo, Callback callback)
397     {
398         if (!canSend())
399         {
400             session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter());
401             throw new IllegalStateException("Protocol violation: cannot send a DATA frame before a SYN_REPLY frame");
402         }
403         if (isLocallyClosed())
404         {
405             session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter());
406             throw new IllegalStateException("Protocol violation: cannot send a DATA frame on a locally closed stream");
407         }
408 
409         // Cannot update the close state here, because the data that we send may
410         // be flow controlled, so we need the stream to update the window size.
411         session.data(this, dataInfo, dataInfo.getTimeout(), dataInfo.getUnit(), callback);
412     }
413 
414     @Override
415     public void headers(HeadersInfo headersInfo) throws InterruptedException, ExecutionException, TimeoutException
416     {
417         FutureCallback result = new FutureCallback();
418         headers(headersInfo, result);
419         if (headersInfo.getTimeout() > 0)
420             result.get(headersInfo.getTimeout(), headersInfo.getUnit());
421         else
422             result.get();
423     }
424 
425     @Override
426     public void headers(HeadersInfo headersInfo, Callback callback)
427     {
428         if (!canSend())
429         {
430             session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter());
431             throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame before a SYN_REPLY frame");
432         }
433         if (isLocallyClosed())
434         {
435             session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter());
436             throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame on a closed stream");
437         }
438 
439         updateCloseState(headersInfo.isClose(), true);
440         HeadersFrame frame = new HeadersFrame(session.getVersion(), headersInfo.getFlags(), getId(), headersInfo.getHeaders());
441         session.control(this, frame, headersInfo.getTimeout(), headersInfo.getUnit(), callback);
442     }
443 
444     @Override
445     public boolean isUnidirectional()
446     {
447         return associatedStream != null;
448     }
449 
450     @Override
451     public boolean isReset()
452     {
453         return reset;
454     }
455 
456     @Override
457     public boolean isHalfClosed()
458     {
459         CloseState closeState = this.closeState;
460         return closeState == CloseState.LOCALLY_CLOSED || closeState == CloseState.REMOTELY_CLOSED || closeState == CloseState.CLOSED;
461     }
462 
463     @Override
464     public boolean isClosed()
465     {
466         return closeState == CloseState.CLOSED;
467     }
468 
469     private boolean isLocallyClosed()
470     {
471         CloseState closeState = this.closeState;
472         return closeState == CloseState.LOCALLY_CLOSED || closeState == CloseState.CLOSED;
473     }
474 
475     private boolean isRemotelyClosed()
476     {
477         CloseState closeState = this.closeState;
478         return closeState == CloseState.REMOTELY_CLOSED || closeState == CloseState.CLOSED;
479     }
480 
481     @Override
482     public String toString()
483     {
484         return String.format("stream=%d v%d windowSize=%d reset=%s prio=%d %s %s", getId(), session.getVersion(),
485                 getWindowSize(), isReset(), priority, openState, closeState);
486     }
487 
488     private boolean canSend()
489     {
490         OpenState openState = this.openState;
491         return openState == OpenState.SYN_SENT || openState == OpenState.REPLY_RECV || openState == OpenState.REPLY_SENT;
492     }
493 
494     private boolean canReceive()
495     {
496         OpenState openState = this.openState;
497         return openState == OpenState.SYN_RECV || openState == OpenState.REPLY_RECV || openState == OpenState.REPLY_SENT;
498     }
499 
500     private enum OpenState
501     {
502         SYN_SENT, SYN_RECV, REPLY_SENT, REPLY_RECV
503     }
504 
505     private enum CloseState
506     {
507         OPENED, LOCALLY_CLOSED, REMOTELY_CLOSED, CLOSED
508     }
509 }