1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.spdy.parser;
20
21 import java.nio.ByteBuffer;
22
23 import org.eclipse.jetty.spdy.CompressionFactory;
24 import org.eclipse.jetty.spdy.PushSynInfo;
25 import org.eclipse.jetty.spdy.StreamException;
26 import org.eclipse.jetty.spdy.api.SPDY;
27 import org.eclipse.jetty.spdy.api.StreamStatus;
28 import org.eclipse.jetty.spdy.api.SynInfo;
29 import org.eclipse.jetty.spdy.frames.ControlFrameType;
30 import org.eclipse.jetty.spdy.frames.SynStreamFrame;
31 import org.eclipse.jetty.util.Fields;
32
33 public class SynStreamBodyParser extends ControlFrameBodyParser
34 {
35 private final Fields headers = new Fields();
36 private final ControlFrameParser controlFrameParser;
37 private final HeadersBlockParser headersBlockParser;
38 private State state = State.STREAM_ID;
39 private int cursor;
40 private int streamId;
41 private int associatedStreamId;
42 private byte priority;
43 private short slot;
44
45 public SynStreamBodyParser(CompressionFactory.Decompressor decompressor, ControlFrameParser controlFrameParser)
46 {
47 this.controlFrameParser = controlFrameParser;
48 this.headersBlockParser = new SynStreamHeadersBlockParser(decompressor);
49 }
50
51 @Override
52 public boolean parse(ByteBuffer buffer)
53 {
54 while (buffer.hasRemaining())
55 {
56 switch (state)
57 {
58 case STREAM_ID:
59 {
60 if (buffer.remaining() >= 4)
61 {
62 streamId = buffer.getInt() & 0x7F_FF_FF_FF;
63 state = State.ASSOCIATED_STREAM_ID;
64 }
65 else
66 {
67 state = State.STREAM_ID_BYTES;
68 cursor = 4;
69 }
70 break;
71 }
72 case STREAM_ID_BYTES:
73 {
74 byte currByte = buffer.get();
75 --cursor;
76 streamId += (currByte & 0xFF) << 8 * cursor;
77 if (cursor == 0)
78 {
79 streamId &= 0x7F_FF_FF_FF;
80 state = State.ASSOCIATED_STREAM_ID;
81 }
82 break;
83 }
84 case ASSOCIATED_STREAM_ID:
85 {
86
87
88 checkVersion(controlFrameParser.getVersion(), streamId);
89
90 if (buffer.remaining() >= 4)
91 {
92 associatedStreamId = buffer.getInt() & 0x7F_FF_FF_FF;
93 state = State.PRIORITY;
94 }
95 else
96 {
97 state = State.ASSOCIATED_STREAM_ID_BYTES;
98 cursor = 4;
99 }
100 break;
101 }
102 case ASSOCIATED_STREAM_ID_BYTES:
103 {
104 byte currByte = buffer.get();
105 --cursor;
106 associatedStreamId += (currByte & 0xFF) << 8 * cursor;
107 if (cursor == 0)
108 {
109 associatedStreamId &= 0x7F_FF_FF_FF;
110 state = State.PRIORITY;
111 }
112 break;
113 }
114 case PRIORITY:
115 {
116 byte currByte = buffer.get();
117 ++cursor;
118 if (cursor == 1)
119 {
120 priority = readPriority(controlFrameParser.getVersion(), currByte);
121 }
122 else
123 {
124 slot = (short)(currByte & 0xFF);
125 if (slot < 0)
126 throw new StreamException(streamId, StreamStatus.INVALID_CREDENTIALS);
127 cursor = 0;
128 state = State.HEADERS;
129 }
130 break;
131 }
132 case HEADERS:
133 {
134 short version = controlFrameParser.getVersion();
135 int length = controlFrameParser.getLength() - 10;
136 if (headersBlockParser.parse(streamId, version, length, buffer))
137 {
138 byte flags = controlFrameParser.getFlags();
139 if (flags > (SynInfo.FLAG_CLOSE | PushSynInfo.FLAG_UNIDIRECTIONAL))
140 throw new IllegalArgumentException("Invalid flag " + flags + " for frame " +
141 ControlFrameType.SYN_STREAM);
142
143 SynStreamFrame frame = new SynStreamFrame(version, flags, streamId, associatedStreamId,
144 priority, slot, new Fields(headers, false));
145 controlFrameParser.onControlFrame(frame);
146
147 reset();
148 return true;
149 }
150 break;
151 }
152 default:
153 {
154 throw new IllegalStateException();
155 }
156 }
157 }
158 return false;
159 }
160
161 private void checkVersion(short version, int streamId)
162 {
163 if (version != SPDY.V2 && version != SPDY.V3)
164 throw new StreamException(streamId, StreamStatus.UNSUPPORTED_VERSION);
165 }
166
167 private byte readPriority(short version, byte currByte)
168 {
169
170
171 switch (version)
172 {
173 case SPDY.V2:
174 int p2 = currByte & 0b1100_0000;
175 p2 >>>= 6;
176 return (byte)p2;
177 case SPDY.V3:
178 int p3 = currByte & 0b1110_0000;
179 p3 >>>= 5;
180 return (byte)p3;
181 default:
182 throw new IllegalStateException();
183 }
184 }
185
186 private void reset()
187 {
188 headers.clear();
189 state = State.STREAM_ID;
190 cursor = 0;
191 streamId = 0;
192 associatedStreamId = 0;
193 priority = 0;
194 }
195
196 private enum State
197 {
198 STREAM_ID, STREAM_ID_BYTES, ASSOCIATED_STREAM_ID, ASSOCIATED_STREAM_ID_BYTES, PRIORITY, HEADERS
199 }
200
201 private class SynStreamHeadersBlockParser extends HeadersBlockParser
202 {
203 public SynStreamHeadersBlockParser(CompressionFactory.Decompressor decompressor)
204 {
205 super(decompressor);
206 }
207
208 @Override
209 protected void onHeader(String name, String[] values)
210 {
211 for (String value : values)
212 headers.add(name, value);
213 }
214 }
215 }