View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.spdy;
20  
21  import java.util.Collections;
22  import java.util.Map;
23  import java.util.Set;
24  import java.util.concurrent.ConcurrentHashMap;
25  import java.util.concurrent.ExecutionException;
26  import java.util.concurrent.TimeoutException;
27  import java.util.concurrent.atomic.AtomicInteger;
28  
29  import org.eclipse.jetty.io.IdleTimeout;
30  import org.eclipse.jetty.spdy.api.DataInfo;
31  import org.eclipse.jetty.spdy.api.HeadersInfo;
32  import org.eclipse.jetty.spdy.api.PushInfo;
33  import org.eclipse.jetty.spdy.api.ReplyInfo;
34  import org.eclipse.jetty.spdy.api.RstInfo;
35  import org.eclipse.jetty.spdy.api.Stream;
36  import org.eclipse.jetty.spdy.api.StreamFrameListener;
37  import org.eclipse.jetty.spdy.api.StreamStatus;
38  import org.eclipse.jetty.spdy.frames.ControlFrame;
39  import org.eclipse.jetty.spdy.frames.HeadersFrame;
40  import org.eclipse.jetty.spdy.frames.SynReplyFrame;
41  import org.eclipse.jetty.util.Callback;
42  import org.eclipse.jetty.util.FutureCallback;
43  import org.eclipse.jetty.util.FuturePromise;
44  import org.eclipse.jetty.util.Promise;
45  import org.eclipse.jetty.util.log.Log;
46  import org.eclipse.jetty.util.log.Logger;
47  import org.eclipse.jetty.util.thread.Scheduler;
48  
49  public class StandardStream extends IdleTimeout implements IStream
50  {
51      private static final Logger LOG = Log.getLogger(Stream.class);
52      private final Map<String, Object> attributes = new ConcurrentHashMap<>();
53      private final int id;
54      private final byte priority;
55      private final ISession session;
56      private final IStream associatedStream;
57      private final Promise<Stream> promise;
58      private final AtomicInteger windowSize = new AtomicInteger();
59      private final Set<Stream> pushedStreams = Collections.newSetFromMap(new ConcurrentHashMap<Stream, Boolean>());
60      private volatile StreamFrameListener listener;
61      private volatile OpenState openState = OpenState.SYN_SENT;
62      private volatile CloseState closeState = CloseState.OPENED;
63      private volatile boolean reset = false;
64  
65      public StandardStream(int id, byte priority, ISession session, IStream associatedStream, Scheduler scheduler, Promise<Stream> promise)
66      {
67          super(scheduler);
68          this.id = id;
69          this.priority = priority;
70          this.session = session;
71          this.associatedStream = associatedStream;
72          this.promise = promise;
73      }
74  
75      @Override
76      public int getId()
77      {
78          return id;
79      }
80  
81      @Override
82      public IStream getAssociatedStream()
83      {
84          return associatedStream;
85      }
86  
87      @Override
88      public Set<Stream> getPushedStreams()
89      {
90          return pushedStreams;
91      }
92  
93      @Override
94      public void associate(IStream stream)
95      {
96          pushedStreams.add(stream);
97      }
98  
99      @Override
100     public void disassociate(IStream stream)
101     {
102         pushedStreams.remove(stream);
103     }
104 
105     @Override
106     public byte getPriority()
107     {
108         return priority;
109     }
110 
111     @Override
112     protected void onIdleExpired(TimeoutException timeout)
113     {
114         StreamFrameListener listener = this.listener;
115         if (listener != null)
116             listener.onFailure(this, timeout);
117     }
118 
119     @Override
120     public boolean isOpen()
121     {
122         return !isClosed();
123     }
124 
125     @Override
126     public int getWindowSize()
127     {
128         return windowSize.get();
129     }
130 
131     @Override
132     public void updateWindowSize(int delta)
133     {
134         int size = windowSize.addAndGet(delta);
135         LOG.debug("Updated window size {} -> {} for {}", size - delta, size, this);
136     }
137 
138     @Override
139     public ISession getSession()
140     {
141         return session;
142     }
143 
144     @Override
145     public Object getAttribute(String key)
146     {
147         return attributes.get(key);
148     }
149 
150     @Override
151     public void setAttribute(String key, Object value)
152     {
153         attributes.put(key, value);
154     }
155 
156     @Override
157     public Object removeAttribute(String key)
158     {
159         return attributes.remove(key);
160     }
161 
162     @Override
163     public void setStreamFrameListener(StreamFrameListener listener)
164     {
165         this.listener = listener;
166     }
167 
168     @Override
169     public StreamFrameListener getStreamFrameListener()
170     {
171         return listener;
172     }
173 
174     @Override
175     public void updateCloseState(boolean close, boolean local)
176     {
177         LOG.debug("{} close={} local={}", this, close, local);
178         if (close)
179         {
180             switch (closeState)
181             {
182                 case OPENED:
183                 {
184                     closeState = local ? CloseState.LOCALLY_CLOSED : CloseState.REMOTELY_CLOSED;
185                     break;
186                 }
187                 case LOCALLY_CLOSED:
188                 {
189                     if (local)
190                         throw new IllegalStateException();
191                     else
192                         closeState = CloseState.CLOSED;
193                     break;
194                 }
195                 case REMOTELY_CLOSED:
196                 {
197                     if (local)
198                         closeState = CloseState.CLOSED;
199                     else
200                         throw new IllegalStateException();
201                     break;
202                 }
203                 default:
204                 {
205                     LOG.warn("Already CLOSED! {} local={}", this, local);
206                 }
207             }
208         }
209     }
210 
211     @Override
212     public void process(ControlFrame frame)
213     {
214         notIdle();
215         switch (frame.getType())
216         {
217             case SYN_STREAM:
218             {
219                 openState = OpenState.SYN_RECV;
220                 break;
221             }
222             case SYN_REPLY:
223             {
224                 openState = OpenState.REPLY_RECV;
225                 SynReplyFrame synReply = (SynReplyFrame)frame;
226                 updateCloseState(synReply.isClose(), false);
227                 ReplyInfo replyInfo = new ReplyInfo(synReply.getHeaders(), synReply.isClose());
228                 notifyOnReply(replyInfo);
229                 break;
230             }
231             case HEADERS:
232             {
233                 HeadersFrame headers = (HeadersFrame)frame;
234                 updateCloseState(headers.isClose(), false);
235                 HeadersInfo headersInfo = new HeadersInfo(headers.getHeaders(), headers.isClose(), headers.isResetCompression());
236                 notifyOnHeaders(headersInfo);
237                 break;
238             }
239             case RST_STREAM:
240             {
241                 reset = true;
242                 break;
243             }
244             default:
245             {
246                 throw new IllegalStateException();
247             }
248         }
249     }
250 
251     @Override
252     public void process(DataInfo dataInfo)
253     {
254         notIdle();
255         // TODO: in v3 we need to send a rst instead of just ignoring
256         // ignore data frame if this stream is remotelyClosed already
257         if (isRemotelyClosed())
258         {
259             LOG.debug("Stream is remotely closed, ignoring {}", dataInfo);
260             return;
261         }
262 
263         if (!canReceive())
264         {
265             LOG.debug("Protocol error receiving {}, resetting", dataInfo);
266             session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter());
267             return;
268         }
269 
270         updateCloseState(dataInfo.isClose(), false);
271         notifyOnData(dataInfo);
272     }
273 
274     @Override
275     public void succeeded()
276     {
277         if (promise != null)
278             promise.succeeded(this);
279     }
280 
281     @Override
282     public void failed(Throwable x)
283     {
284         if (promise != null)
285             promise.failed(x);
286     }
287 
288     private void notifyOnReply(ReplyInfo replyInfo)
289     {
290         final StreamFrameListener listener = this.listener;
291         try
292         {
293             if (listener != null)
294             {
295                 LOG.debug("Invoking reply callback with {} on listener {}", replyInfo, listener);
296                 listener.onReply(this, replyInfo);
297             }
298         }
299         catch (Exception x)
300         {
301             LOG.info("Exception while notifying listener " + listener, x);
302         }
303         catch (Error x)
304         {
305             LOG.info("Exception while notifying listener " + listener, x);
306             throw x;
307         }
308     }
309 
310     private void notifyOnHeaders(HeadersInfo headersInfo)
311     {
312         final StreamFrameListener listener = this.listener;
313         try
314         {
315             if (listener != null)
316             {
317                 LOG.debug("Invoking headers callback with {} on listener {}", headersInfo, listener);
318                 listener.onHeaders(this, headersInfo);
319             }
320         }
321         catch (Exception x)
322         {
323             LOG.info("Exception while notifying listener " + listener, x);
324         }
325         catch (Error x)
326         {
327             LOG.info("Exception while notifying listener " + listener, x);
328             throw x;
329         }
330     }
331 
332     private void notifyOnData(DataInfo dataInfo)
333     {
334         final StreamFrameListener listener = this.listener;
335         try
336         {
337             if (listener != null)
338             {
339                 LOG.debug("Invoking data callback with {} on listener {}", dataInfo, listener);
340                 listener.onData(this, dataInfo);
341                 LOG.debug("Invoked data callback with {} on listener {}", dataInfo, listener);
342             }
343         }
344         catch (Exception x)
345         {
346             LOG.info("Exception while notifying listener " + listener, x);
347         }
348         catch (Error x)
349         {
350             LOG.info("Exception while notifying listener " + listener, x);
351             throw x;
352         }
353     }
354 
355     @Override
356     public Stream push(PushInfo pushInfo) throws InterruptedException, ExecutionException, TimeoutException
357     {
358         FuturePromise<Stream> result = new FuturePromise<>();
359         push(pushInfo, result);
360         if (pushInfo.getTimeout() > 0)
361             return result.get(pushInfo.getTimeout(), pushInfo.getUnit());
362         else
363             return result.get();
364     }
365 
366     @Override
367     public void push(PushInfo pushInfo, Promise<Stream> promise)
368     {
369         notIdle();
370         if (isClosed() || isReset())
371         {
372             promise.failed(new StreamException(getId(), StreamStatus.STREAM_ALREADY_CLOSED,
373                     "Stream: " + this + " already closed or reset!"));
374             return;
375         }
376         PushSynInfo pushSynInfo = new PushSynInfo(getId(), pushInfo);
377         session.syn(pushSynInfo, null, promise);
378     }
379 
380     @Override
381     public void reply(ReplyInfo replyInfo) throws InterruptedException, ExecutionException, TimeoutException
382     {
383         FutureCallback result = new FutureCallback();
384         reply(replyInfo, result);
385         if (replyInfo.getTimeout() > 0)
386             result.get(replyInfo.getTimeout(), replyInfo.getUnit());
387         else
388             result.get();
389     }
390 
391     @Override
392     public void reply(ReplyInfo replyInfo, Callback callback)
393     {
394         notIdle();
395         if (isUnidirectional())
396             throw new IllegalStateException("Protocol violation: cannot send SYN_REPLY frames in unidirectional streams");
397         openState = OpenState.REPLY_SENT;
398         updateCloseState(replyInfo.isClose(), true);
399         SynReplyFrame frame = new SynReplyFrame(session.getVersion(), replyInfo.getFlags(), getId(), replyInfo.getHeaders());
400         session.control(this, frame, replyInfo.getTimeout(), replyInfo.getUnit(), callback);
401     }
402 
403     @Override
404     public void data(DataInfo dataInfo) throws InterruptedException, ExecutionException, TimeoutException
405     {
406         FutureCallback result = new FutureCallback();
407         data(dataInfo, result);
408         if (dataInfo.getTimeout() > 0)
409             result.get(dataInfo.getTimeout(), dataInfo.getUnit());
410         else
411             result.get();
412     }
413 
414     @Override
415     public void data(DataInfo dataInfo, Callback callback)
416     {
417         notIdle();
418         if (!canSend())
419         {
420             session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter());
421             throw new IllegalStateException("Protocol violation: cannot send a DATA frame before a SYN_REPLY frame");
422         }
423         if (isLocallyClosed())
424         {
425             session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter());
426             throw new IllegalStateException("Protocol violation: cannot send a DATA frame on a locally closed stream");
427         }
428 
429         // Cannot update the close state here, because the data that we send may
430         // be flow controlled, so we need the stream to update the window size.
431         session.data(this, dataInfo, dataInfo.getTimeout(), dataInfo.getUnit(), callback);
432     }
433 
434     @Override
435     public void headers(HeadersInfo headersInfo) throws InterruptedException, ExecutionException, TimeoutException
436     {
437         FutureCallback result = new FutureCallback();
438         headers(headersInfo, result);
439         if (headersInfo.getTimeout() > 0)
440             result.get(headersInfo.getTimeout(), headersInfo.getUnit());
441         else
442             result.get();
443     }
444 
445     @Override
446     public void headers(HeadersInfo headersInfo, Callback callback)
447     {
448         notIdle();
449         if (!canSend())
450         {
451             session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter());
452             throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame before a SYN_REPLY frame");
453         }
454         if (isLocallyClosed())
455         {
456             session.rst(new RstInfo(getId(), StreamStatus.PROTOCOL_ERROR), new Adapter());
457             throw new IllegalStateException("Protocol violation: cannot send a HEADERS frame on a closed stream");
458         }
459 
460         updateCloseState(headersInfo.isClose(), true);
461         HeadersFrame frame = new HeadersFrame(session.getVersion(), headersInfo.getFlags(), getId(), headersInfo.getHeaders());
462         session.control(this, frame, headersInfo.getTimeout(), headersInfo.getUnit(), callback);
463     }
464 
465     @Override
466     public boolean isUnidirectional()
467     {
468         return associatedStream != null;
469     }
470 
471     @Override
472     public boolean isReset()
473     {
474         return reset;
475     }
476 
477     @Override
478     public boolean isHalfClosed()
479     {
480         CloseState closeState = this.closeState;
481         return closeState == CloseState.LOCALLY_CLOSED || closeState == CloseState.REMOTELY_CLOSED || closeState == CloseState.CLOSED;
482     }
483 
484     @Override
485     public boolean isClosed()
486     {
487         return closeState == CloseState.CLOSED;
488     }
489 
490     private boolean isLocallyClosed()
491     {
492         CloseState closeState = this.closeState;
493         return closeState == CloseState.LOCALLY_CLOSED || closeState == CloseState.CLOSED;
494     }
495 
496     private boolean isRemotelyClosed()
497     {
498         CloseState closeState = this.closeState;
499         return closeState == CloseState.REMOTELY_CLOSED || closeState == CloseState.CLOSED;
500     }
501 
502     @Override
503     public String toString()
504     {
505         return String.format("stream=%d v%d windowSize=%d reset=%s prio=%d %s %s", getId(), session.getVersion(),
506                 getWindowSize(), isReset(), priority, openState, closeState);
507     }
508 
509     private boolean canSend()
510     {
511         OpenState openState = this.openState;
512         return openState == OpenState.SYN_SENT || openState == OpenState.REPLY_RECV || openState == OpenState.REPLY_SENT;
513     }
514 
515     private boolean canReceive()
516     {
517         OpenState openState = this.openState;
518         return openState == OpenState.SYN_RECV || openState == OpenState.REPLY_RECV || openState == OpenState.REPLY_SENT;
519     }
520 
521     private enum OpenState
522     {
523         SYN_SENT, SYN_RECV, REPLY_SENT, REPLY_RECV
524     }
525 
526     private enum CloseState
527     {
528         OPENED, LOCALLY_CLOSED, REMOTELY_CLOSED, CLOSED
529     }
530 }