View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.spdy;
20  
21  import java.util.Collections;
22  import java.util.Map;
23  import java.util.Set;
24  import java.util.concurrent.ConcurrentHashMap;
25  import java.util.concurrent.ExecutionException;
26  import java.util.concurrent.TimeoutException;
27  import java.util.concurrent.atomic.AtomicInteger;
28  
29  import org.eclipse.jetty.io.IdleTimeout;
30  import org.eclipse.jetty.spdy.api.DataInfo;
31  import org.eclipse.jetty.spdy.api.HeadersInfo;
32  import org.eclipse.jetty.spdy.api.PushInfo;
33  import org.eclipse.jetty.spdy.api.ReplyInfo;
34  import org.eclipse.jetty.spdy.api.RstInfo;
35  import org.eclipse.jetty.spdy.api.Stream;
36  import org.eclipse.jetty.spdy.api.StreamFrameListener;
37  import org.eclipse.jetty.spdy.api.StreamStatus;
38  import org.eclipse.jetty.spdy.frames.ControlFrame;
39  import org.eclipse.jetty.spdy.frames.HeadersFrame;
40  import org.eclipse.jetty.spdy.frames.SynReplyFrame;
41  import org.eclipse.jetty.util.Callback;
42  import org.eclipse.jetty.util.FutureCallback;
43  import org.eclipse.jetty.util.FuturePromise;
44  import org.eclipse.jetty.util.Promise;
45  import org.eclipse.jetty.util.log.Log;
46  import org.eclipse.jetty.util.log.Logger;
47  import org.eclipse.jetty.util.thread.Scheduler;
48  
49  public class StandardStream extends IdleTimeout implements IStream
50  {
51      private static final Logger LOG = Log.getLogger(Stream.class);
52      private final Map<String, Object> attributes = new ConcurrentHashMap<>();
53      private final int id;
54      private final byte priority;
55      private final ISession session;
56      private final IStream associatedStream;
57      private final Promise<Stream> promise;
58      private final AtomicInteger windowSize = new AtomicInteger();
59      private final Set<Stream> pushedStreams = Collections.newSetFromMap(new ConcurrentHashMap<Stream, Boolean>());
60      private volatile StreamFrameListener listener;
61      private volatile OpenState openState = OpenState.SYN_SENT;
62      private volatile CloseState closeState = CloseState.OPENED;
63      private volatile boolean reset = false;
64  
65      public StandardStream(int id, byte priority, ISession session, IStream associatedStream, Scheduler scheduler, Promise<Stream> promise)
66      {
67          super(scheduler);
68          this.id = id;
69          this.priority = priority;
70          this.session = session;
71          this.associatedStream = associatedStream;
72          this.promise = promise;
73      }
74  
75      @Override
76      public int getId()
77      {
78          return id;
79      }
80  
81      @Override
82      public IStream getAssociatedStream()
83      {
84          return associatedStream;
85      }
86  
87      @Override
88      public Set<Stream> getPushedStreams()
89      {
90          return pushedStreams;
91      }
92  
93      @Override
94      public void associate(IStream stream)
95      {
96          pushedStreams.add(stream);
97      }
98  
99      @Override
100     public void disassociate(IStream stream)
101     {
102         pushedStreams.remove(stream);
103     }
104 
105     @Override
106     public byte getPriority()
107     {
108         return priority;
109     }
110 
111     @Override
112     protected void onIdleExpired(TimeoutException timeout)
113     {
114         StreamFrameListener listener = this.listener;
115         if (listener != null)
116             listener.onFailure(this, timeout);
117         // The stream is now gone, we must close it to
118         // avoid that its idle timeout is rescheduled.
119         close();
120     }
121 
122     private void close()
123     {
124         closeState = CloseState.CLOSED;
125         onClose();
126     }
127 
128     @Override
129     public boolean isOpen()
130     {
131         return !isClosed();
132     }
133 
134     @Override
135     public int getWindowSize()
136     {
137         return windowSize.get();
138     }
139 
140     @Override
141     public void updateWindowSize(int delta)
142     {
143         int size = windowSize.addAndGet(delta);
144         if (LOG.isDebugEnabled())
145             LOG.debug("Updated window size {} -> {} for {}", size - delta, size, this);
146     }
147 
148     @Override
149     public ISession getSession()
150     {
151         return session;
152     }
153 
154     @Override
155     public Object getAttribute(String key)
156     {
157         return attributes.get(key);
158     }
159 
160     @Override
161     public void setAttribute(String key, Object value)
162     {
163         attributes.put(key, value);
164     }
165 
166     @Override
167     public Object removeAttribute(String key)
168     {
169         return attributes.remove(key);
170     }
171 
172     @Override
173     public void setStreamFrameListener(StreamFrameListener listener)
174     {
175         this.listener = listener;
176     }
177 
178     @Override
179     public StreamFrameListener getStreamFrameListener()
180     {
181         return listener;
182     }
183 
184     @Override
185     public void updateCloseState(boolean close, boolean local)
186     {
187         if (LOG.isDebugEnabled())
188             LOG.debug("{} close={} local={}", this, close, local);
189         if (close)
190         {
191             switch (closeState)
192             {
193                 case OPENED:
194                 {
195                     closeState = local ? CloseState.LOCALLY_CLOSED : CloseState.REMOTELY_CLOSED;
196                     break;
197                 }
198                 case LOCALLY_CLOSED:
199                 {
200                     if (local)
201                         throw new IllegalStateException();
202                     else
203                         close();
204                     break;
205                 }
206                 case REMOTELY_CLOSED:
207                 {
208                     if (local)
209                         close();
210                     else
211                         throw new IllegalStateException();
212                     break;
213                 }
214                 default:
215                 {
216                     LOG.warn("Already CLOSED! {} local={}", this, local);
217                 }
218             }
219         }
220     }
221 
222     @Override
223     public void process(ControlFrame frame)
224     {
225         notIdle();
226         switch (frame.getType())
227         {
228             case SYN_STREAM:
229             {
230                 openState = OpenState.SYN_RECV;
231                 break;
232             }
233             case SYN_REPLY:
234             {
235                 openState = OpenState.REPLY_RECV;
236                 SynReplyFrame synReply = (SynReplyFrame)frame;
237                 updateCloseState(synReply.isClose(), false);
238                 ReplyInfo replyInfo = new ReplyInfo(synReply.getHeaders(), synReply.isClose());
239                 notifyOnReply(replyInfo);
240                 break;
241             }
242             case HEADERS:
243             {
244                 HeadersFrame headers = (HeadersFrame)frame;
245                 updateCloseState(headers.isClose(), false);
246                 HeadersInfo headersInfo = new HeadersInfo(headers.getHeaders(), headers.isClose(), headers.isResetCompression());
247                 notifyOnHeaders(headersInfo);
248                 break;
249             }
250             case RST_STREAM:
251             {
252                 reset = true;
253                 break;
254             }
255             default:
256             {
257                 throw new IllegalStateException();
258             }
259         }
260     }
261 
262     @Override
263     public void process(DataInfo dataInfo)
264     {
265         notIdle();
266         // TODO: in v3 we need to send a rst instead of just ignoring
267         // ignore data frame if this stream is remotelyClosed already
268         if (isRemotelyClosed())
269         {
270             if (LOG.isDebugEnabled())
271                 LOG.debug("Stream is remotely closed, ignoring {}", dataInfo);
272             return;
273         }
274 
275         if (!canReceive())
276         {
277             if (LOG.isDebugEnabled())
278                 LOG.debug("Protocol error receiving {}, resetting", dataInfo);
279             session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), Callback.Adapter.INSTANCE);
280             return;
281         }
282 
283         updateCloseState(dataInfo.isClose(), false);
284         notifyOnData(dataInfo);
285     }
286 
287     @Override
288     public void succeeded()
289     {
290         if (promise != null)
291             promise.succeeded(this);
292     }
293 
294     @Override
295     public void failed(Throwable x)
296     {
297         if (promise != null)
298             promise.failed(x);
299     }
300 
301     private void notifyOnReply(ReplyInfo replyInfo)
302     {
303         final StreamFrameListener listener = this.listener;
304         try
305         {
306             if (listener != null)
307             {
308                 if (LOG.isDebugEnabled())
309                     LOG.debug("Invoking reply callback with {} on listener {}", replyInfo, listener);
310                 listener.onReply(this, replyInfo);
311             }
312         }
313         catch (Exception x)
314         {
315             LOG.info("Exception while notifying listener " + listener, x);
316         }
317         catch (Error x)
318         {
319             LOG.info("Exception while notifying listener " + listener, x);
320             throw x;
321         }
322     }
323 
324     private void notifyOnHeaders(HeadersInfo headersInfo)
325     {
326         final StreamFrameListener listener = this.listener;
327         try
328         {
329             if (listener != null)
330             {
331                 if (LOG.isDebugEnabled())
332                     LOG.debug("Invoking headers callback with {} on listener {}", headersInfo, listener);
333                 listener.onHeaders(this, headersInfo);
334             }
335         }
336         catch (Exception x)
337         {
338             LOG.info("Exception while notifying listener " + listener, x);
339         }
340         catch (Error x)
341         {
342             LOG.info("Exception while notifying listener " + listener, x);
343             throw x;
344         }
345     }
346 
347     private void notifyOnData(DataInfo dataInfo)
348     {
349         final StreamFrameListener listener = this.listener;
350         try
351         {
352             if (listener != null)
353             {
354                 if (LOG.isDebugEnabled())
355                     LOG.debug("Invoking data callback with {} on listener {}", dataInfo, listener);
356                 listener.onData(this, dataInfo);
357                 if (LOG.isDebugEnabled())
358                     LOG.debug("Invoked data callback with {} on listener {}", dataInfo, listener);
359             }
360         }
361         catch (Exception x)
362         {
363             LOG.info("Exception while notifying listener " + listener, x);
364         }
365         catch (Error x)
366         {
367             LOG.info("Exception while notifying listener " + listener, x);
368             throw x;
369         }
370     }
371 
372     @Override
373     public Stream push(PushInfo pushInfo) throws InterruptedException, ExecutionException, TimeoutException
374     {
375         FuturePromise<Stream> result = new FuturePromise<>();
376         push(pushInfo, result);
377         if (pushInfo.getTimeout() > 0)
378             return result.get(pushInfo.getTimeout(), pushInfo.getUnit());
379         else
380             return result.get();
381     }
382 
383     @Override
384     public void push(PushInfo pushInfo, Promise<Stream> promise)
385     {
386         notIdle();
387         if (isClosed() || isReset())
388         {
389             close();
390             promise.failed(new StreamException(getId(), StreamStatus.STREAM_ALREADY_CLOSED,
391                     "Stream: " + this + " already closed or reset!"));
392             return;
393         }
394         PushSynInfo pushSynInfo = new PushSynInfo(getId(), pushInfo);
395         session.syn(pushSynInfo, null, new StreamPromise(promise));
396     }
397 
398     @Override
399     public void reply(ReplyInfo replyInfo) throws InterruptedException, ExecutionException, TimeoutException
400     {
401         FutureCallback result = new FutureCallback();
402         reply(replyInfo, result);
403         if (replyInfo.getTimeout() > 0)
404             result.get(replyInfo.getTimeout(), replyInfo.getUnit());
405         else
406             result.get();
407     }
408 
409     @Override
410     public void reply(ReplyInfo replyInfo, Callback callback)
411     {
412         notIdle();
413         if (isUnidirectional())
414         {
415             close();
416             throw new IllegalStateException("Protocol violation: cannot send SYN_REPLY frames in unidirectional streams");
417         }
418         openState = OpenState.REPLY_SENT;
419         updateCloseState(replyInfo.isClose(), true);
420         SynReplyFrame frame = new SynReplyFrame(session.getVersion(), replyInfo.getFlags(), getId(), replyInfo.getHeaders());
421         session.control(this, frame, replyInfo.getTimeout(), replyInfo.getUnit(), new StreamCallback(callback));
422     }
423 
424     @Override
425     public void data(DataInfo dataInfo) throws InterruptedException, ExecutionException, TimeoutException
426     {
427         FutureCallback result = new FutureCallback();
428         data(dataInfo, result);
429         if (dataInfo.getTimeout() > 0)
430             result.get(dataInfo.getTimeout(), dataInfo.getUnit());
431         else
432             result.get();
433     }
434 
435     @Override
436     public void data(DataInfo dataInfo, Callback callback)
437     {
438         notIdle();
439         if (!canSend())
440         {
441             session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new StreamCallback());
442             throw new IllegalStateException("Protocol violation: cannot send a DATA frame before a SYN_REPLY frame");
443         }
444         if (isLocallyClosed())
445         {
446             session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new StreamCallback());
447             throw new IllegalStateException("Protocol violation: cannot send a DATA frame on a locally closed stream");
448         }
449 
450         // Cannot update the close state here, because the data that we send may
451         // be flow controlled, so we need the stream to update the window size.
452         session.data(this, dataInfo, dataInfo.getTimeout(), dataInfo.getUnit(), new StreamCallback(callback));
453     }
454 
455     @Override
456     public void headers(HeadersInfo headersInfo) throws InterruptedException, ExecutionException, TimeoutException
457     {
458         FutureCallback result = new FutureCallback();
459         headers(headersInfo, result);
460         if (headersInfo.getTimeout() > 0)
461             result.get(headersInfo.getTimeout(), headersInfo.getUnit());
462         else
463             result.get();
464     }
465 
466     @Override
467     public void headers(HeadersInfo headersInfo, Callback callback)
468     {
469         notIdle();
470         if (!canSend())
471         {
472             session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new StreamCallback());
473             throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame before a SYN_REPLY frame");
474         }
475         if (isLocallyClosed())
476         {
477             session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new StreamCallback());
478             throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame on a closed stream");
479         }
480 
481         updateCloseState(headersInfo.isClose(), true);
482         HeadersFrame frame = new HeadersFrame(session.getVersion(), headersInfo.getFlags(), getId(), headersInfo.getHeaders());
483         session.control(this, frame, headersInfo.getTimeout(), headersInfo.getUnit(), new StreamCallback(callback));
484     }
485 
486     @Override
487     public boolean isUnidirectional()
488     {
489         return associatedStream != null;
490     }
491 
492     @Override
493     public boolean isReset()
494     {
495         return reset;
496     }
497 
498     @Override
499     public boolean isHalfClosed()
500     {
501         CloseState closeState = this.closeState;
502         return closeState == CloseState.LOCALLY_CLOSED || closeState == CloseState.REMOTELY_CLOSED || closeState == CloseState.CLOSED;
503     }
504 
505     @Override
506     public boolean isClosed()
507     {
508         return closeState == CloseState.CLOSED;
509     }
510 
511     private boolean isLocallyClosed()
512     {
513         CloseState closeState = this.closeState;
514         return closeState == CloseState.LOCALLY_CLOSED || closeState == CloseState.CLOSED;
515     }
516 
517     private boolean isRemotelyClosed()
518     {
519         CloseState closeState = this.closeState;
520         return closeState == CloseState.REMOTELY_CLOSED || closeState == CloseState.CLOSED;
521     }
522 
523     @Override
524     public String toString()
525     {
526         return String.format("stream=%d v%d windowSize=%d reset=%s prio=%d %s %s", getId(), session.getVersion(),
527                 getWindowSize(), isReset(), priority, openState, closeState);
528     }
529 
530     private boolean canSend()
531     {
532         OpenState openState = this.openState;
533         return openState == OpenState.SYN_SENT || openState == OpenState.REPLY_RECV || openState == OpenState.REPLY_SENT;
534     }
535 
536     private boolean canReceive()
537     {
538         OpenState openState = this.openState;
539         return openState == OpenState.SYN_RECV || openState == OpenState.REPLY_RECV || openState == OpenState.REPLY_SENT;
540     }
541 
542     private enum OpenState
543     {
544         SYN_SENT, SYN_RECV, REPLY_SENT, REPLY_RECV
545     }
546 
547     private enum CloseState
548     {
549         OPENED, LOCALLY_CLOSED, REMOTELY_CLOSED, CLOSED
550     }
551 
552     private class StreamCallback implements Callback
553     {
554         private final Callback callback;
555 
556         private StreamCallback()
557         {
558             this(Callback.Adapter.INSTANCE);
559         }
560 
561         private StreamCallback(Callback callback)
562         {
563             this.callback = callback;
564         }
565 
566         @Override
567         public void succeeded()
568         {
569             callback.succeeded();
570         }
571 
572         @Override
573         public void failed(Throwable x)
574         {
575             close();
576             callback.failed(x);
577         }
578     }
579 
580     private class StreamPromise implements Promise<Stream>
581     {
582         private final Promise<Stream> promise;
583 
584         public StreamPromise(Promise<Stream> promise)
585         {
586             this.promise = promise;
587         }
588 
589         @Override
590         public void succeeded(Stream result)
591         {
592             promise.succeeded(result);
593         }
594 
595         @Override
596         public void failed(Throwable x)
597         {
598             close();
599             promise.failed(x);
600         }
601     }
602 }