View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.spdy.parser;
20  
21  import java.nio.ByteBuffer;
22  
23  import org.eclipse.jetty.spdy.CompressionFactory;
24  import org.eclipse.jetty.spdy.PushSynInfo;
25  import org.eclipse.jetty.spdy.StreamException;
26  import org.eclipse.jetty.spdy.api.SPDY;
27  import org.eclipse.jetty.spdy.api.StreamStatus;
28  import org.eclipse.jetty.spdy.api.SynInfo;
29  import org.eclipse.jetty.spdy.frames.ControlFrameType;
30  import org.eclipse.jetty.spdy.frames.SynStreamFrame;
31  import org.eclipse.jetty.util.Fields;
32  
33  public class SynStreamBodyParser extends ControlFrameBodyParser
34  {
35      private final Fields headers = new Fields();
36      private final ControlFrameParser controlFrameParser;
37      private final HeadersBlockParser headersBlockParser;
38      private State state = State.STREAM_ID;
39      private int cursor;
40      private int streamId;
41      private int associatedStreamId;
42      private byte priority;
43      private short slot;
44  
45      public SynStreamBodyParser(CompressionFactory.Decompressor decompressor, ControlFrameParser controlFrameParser)
46      {
47          this.controlFrameParser = controlFrameParser;
48          this.headersBlockParser = new SynStreamHeadersBlockParser(decompressor);
49      }
50  
51      @Override
52      public boolean parse(ByteBuffer buffer)
53      {
54          while (buffer.hasRemaining())
55          {
56              switch (state)
57              {
58                  case STREAM_ID:
59                  {
60                      if (buffer.remaining() >= 4)
61                      {
62                          streamId = buffer.getInt() & 0x7F_FF_FF_FF;
63                          state = State.ASSOCIATED_STREAM_ID;
64                      }
65                      else
66                      {
67                          state = State.STREAM_ID_BYTES;
68                          cursor = 4;
69                      }
70                      break;
71                  }
72                  case STREAM_ID_BYTES:
73                  {
74                      byte currByte = buffer.get();
75                      --cursor;
76                      streamId += (currByte & 0xFF) << 8 * cursor;
77                      if (cursor == 0)
78                      {
79                          streamId &= 0x7F_FF_FF_FF;
80                          state = State.ASSOCIATED_STREAM_ID;
81                      }
82                      break;
83                  }
84                  case ASSOCIATED_STREAM_ID:
85                  {
86                      // Now we know the streamId, we can do the version check
87                      // and if it is wrong, issue a RST_STREAM
88                      try
89                      {
90                          checkVersion(controlFrameParser.getVersion(), streamId);
91                      }
92                      catch (StreamException e)
93                      {
94                          // We've already read 4 bytes of the streamId which are part of controlFrameParser.getLength
95                          // so we need to substract those from the bytesToSkip.
96                          int bytesToSkip = controlFrameParser.getLength() - 4;
97                          int remaining = buffer.remaining();
98                          if (remaining >= bytesToSkip)
99                          {
100                             buffer.position(buffer.position() + bytesToSkip);
101                             controlFrameParser.reset();
102                             reset();
103                         }
104                         else
105                         {
106                             int bytesToSkipInNextBuffer = bytesToSkip - remaining;
107                             buffer.position(buffer.limit());
108                             controlFrameParser.skip(bytesToSkipInNextBuffer);
109                             reset();
110                         }
111                         throw e;
112                     }
113                     if (buffer.remaining() >= 4)
114                     {
115                         associatedStreamId = buffer.getInt() & 0x7F_FF_FF_FF;
116                         state = State.PRIORITY;
117                     }
118                     else
119                     {
120                         state = State.ASSOCIATED_STREAM_ID_BYTES;
121                         cursor = 4;
122                     }
123                     break;
124                 }
125                 case ASSOCIATED_STREAM_ID_BYTES:
126                 {
127                     byte currByte = buffer.get();
128                     --cursor;
129                     associatedStreamId += (currByte & 0xFF) << 8 * cursor;
130                     if (cursor == 0)
131                     {
132                         associatedStreamId &= 0x7F_FF_FF_FF;
133                         state = State.PRIORITY;
134                     }
135                     break;
136                 }
137                 case PRIORITY:
138                 {
139                     byte currByte = buffer.get();
140                     ++cursor;
141                     if (cursor == 1)
142                     {
143                         priority = readPriority(controlFrameParser.getVersion(), currByte);
144                     }
145                     else
146                     {
147                         slot = (short)(currByte & 0xFF);
148                         cursor = 0;
149                         state = State.HEADERS;
150                     }
151                     break;
152                 }
153                 case HEADERS:
154                 {
155                     short version = controlFrameParser.getVersion();
156                     int length = controlFrameParser.getLength() - 10;
157                     if (headersBlockParser.parse(streamId, version, length, buffer))
158                     {
159                         byte flags = controlFrameParser.getFlags();
160                         if (flags > (SynInfo.FLAG_CLOSE | PushSynInfo.FLAG_UNIDIRECTIONAL))
161                             throw new IllegalArgumentException("Invalid flag " + flags + " for frame " +
162                                     ControlFrameType.SYN_STREAM);
163 
164                         SynStreamFrame frame = new SynStreamFrame(version, flags, streamId, associatedStreamId,
165                                 priority, slot, new Fields(headers, false));
166                         controlFrameParser.onControlFrame(frame);
167 
168                         reset();
169                         return true;
170                     }
171                     break;
172                 }
173                 default:
174                 {
175                     throw new IllegalStateException();
176                 }
177             }
178         }
179         return false;
180     }
181 
182     private void checkVersion(short version, int streamId)
183     {
184         if (version != SPDY.V2 && version != SPDY.V3)
185             throw new StreamException(streamId, StreamStatus.UNSUPPORTED_VERSION);
186     }
187 
188     private byte readPriority(short version, byte currByte)
189     {
190         // Right shift retains the sign bit when operated on a byte,
191         // so we use an int to perform the shifts
192         switch (version)
193         {
194             case SPDY.V2:
195                 int p2 = currByte & 0b1100_0000;
196                 p2 >>>= 6;
197                 return (byte)p2;
198             case SPDY.V3:
199                 int p3 = currByte & 0b1110_0000;
200                 p3 >>>= 5;
201                 return (byte)p3;
202             default:
203                 throw new IllegalStateException();
204         }
205     }
206 
207     private void reset()
208     {
209         headers.clear();
210         state = State.STREAM_ID;
211         cursor = 0;
212         streamId = 0;
213         associatedStreamId = 0;
214         priority = 0;
215     }
216 
217     private enum State
218     {
219         STREAM_ID, STREAM_ID_BYTES, ASSOCIATED_STREAM_ID, ASSOCIATED_STREAM_ID_BYTES, PRIORITY, HEADERS
220     }
221 
222     private class SynStreamHeadersBlockParser extends HeadersBlockParser
223     {
224         public SynStreamHeadersBlockParser(CompressionFactory.Decompressor decompressor)
225         {
226             super(decompressor);
227         }
228 
229         @Override
230         protected void onHeader(String name, String[] values)
231         {
232             for (String value : values)
233                 headers.add(name, value);
234         }
235     }
236 }