1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.spdy.parser;
20
21 import java.nio.ByteBuffer;
22
23 import org.eclipse.jetty.spdy.CompressionFactory;
24 import org.eclipse.jetty.spdy.PushSynInfo;
25 import org.eclipse.jetty.spdy.StreamException;
26 import org.eclipse.jetty.spdy.api.SPDY;
27 import org.eclipse.jetty.spdy.api.StreamStatus;
28 import org.eclipse.jetty.spdy.api.SynInfo;
29 import org.eclipse.jetty.spdy.frames.ControlFrameType;
30 import org.eclipse.jetty.spdy.frames.SynStreamFrame;
31 import org.eclipse.jetty.util.Fields;
32
33 public class SynStreamBodyParser extends ControlFrameBodyParser
34 {
35 private final Fields headers = new Fields();
36 private final ControlFrameParser controlFrameParser;
37 private final HeadersBlockParser headersBlockParser;
38 private State state = State.STREAM_ID;
39 private int cursor;
40 private int streamId;
41 private int associatedStreamId;
42 private byte priority;
43 private short slot;
44
45 public SynStreamBodyParser(CompressionFactory.Decompressor decompressor, ControlFrameParser controlFrameParser)
46 {
47 this.controlFrameParser = controlFrameParser;
48 this.headersBlockParser = new SynStreamHeadersBlockParser(decompressor);
49 }
50
51 @Override
52 public boolean parse(ByteBuffer buffer)
53 {
54 while (buffer.hasRemaining())
55 {
56 switch (state)
57 {
58 case STREAM_ID:
59 {
60 if (buffer.remaining() >= 4)
61 {
62 streamId = buffer.getInt() & 0x7F_FF_FF_FF;
63 state = State.ASSOCIATED_STREAM_ID;
64 }
65 else
66 {
67 state = State.STREAM_ID_BYTES;
68 cursor = 4;
69 }
70 break;
71 }
72 case STREAM_ID_BYTES:
73 {
74 byte currByte = buffer.get();
75 --cursor;
76 streamId += (currByte & 0xFF) << 8 * cursor;
77 if (cursor == 0)
78 {
79 streamId &= 0x7F_FF_FF_FF;
80 state = State.ASSOCIATED_STREAM_ID;
81 }
82 break;
83 }
84 case ASSOCIATED_STREAM_ID:
85 {
86
87
88 try
89 {
90 checkVersion(controlFrameParser.getVersion(), streamId);
91 }
92 catch (StreamException e)
93 {
94
95
96 int bytesToSkip = controlFrameParser.getLength() - 4;
97 int remaining = buffer.remaining();
98 if (remaining >= bytesToSkip)
99 {
100 buffer.position(buffer.position() + bytesToSkip);
101 controlFrameParser.reset();
102 reset();
103 }
104 else
105 {
106 int bytesToSkipInNextBuffer = bytesToSkip - remaining;
107 buffer.position(buffer.limit());
108 controlFrameParser.skip(bytesToSkipInNextBuffer);
109 reset();
110 }
111 throw e;
112 }
113 if (buffer.remaining() >= 4)
114 {
115 associatedStreamId = buffer.getInt() & 0x7F_FF_FF_FF;
116 state = State.PRIORITY;
117 }
118 else
119 {
120 state = State.ASSOCIATED_STREAM_ID_BYTES;
121 cursor = 4;
122 }
123 break;
124 }
125 case ASSOCIATED_STREAM_ID_BYTES:
126 {
127 byte currByte = buffer.get();
128 --cursor;
129 associatedStreamId += (currByte & 0xFF) << 8 * cursor;
130 if (cursor == 0)
131 {
132 associatedStreamId &= 0x7F_FF_FF_FF;
133 state = State.PRIORITY;
134 }
135 break;
136 }
137 case PRIORITY:
138 {
139 byte currByte = buffer.get();
140 ++cursor;
141 if (cursor == 1)
142 {
143 priority = readPriority(controlFrameParser.getVersion(), currByte);
144 }
145 else
146 {
147 slot = (short)(currByte & 0xFF);
148 cursor = 0;
149 state = State.HEADERS;
150 }
151 break;
152 }
153 case HEADERS:
154 {
155 short version = controlFrameParser.getVersion();
156 int length = controlFrameParser.getLength() - 10;
157 if (headersBlockParser.parse(streamId, version, length, buffer))
158 {
159 byte flags = controlFrameParser.getFlags();
160 if (flags > (SynInfo.FLAG_CLOSE | PushSynInfo.FLAG_UNIDIRECTIONAL))
161 throw new IllegalArgumentException("Invalid flag " + flags + " for frame " +
162 ControlFrameType.SYN_STREAM);
163
164 SynStreamFrame frame = new SynStreamFrame(version, flags, streamId, associatedStreamId,
165 priority, slot, new Fields(headers, false));
166 controlFrameParser.onControlFrame(frame);
167
168 reset();
169 return true;
170 }
171 break;
172 }
173 default:
174 {
175 throw new IllegalStateException();
176 }
177 }
178 }
179 return false;
180 }
181
182 private void checkVersion(short version, int streamId)
183 {
184 if (version != SPDY.V2 && version != SPDY.V3)
185 throw new StreamException(streamId, StreamStatus.UNSUPPORTED_VERSION);
186 }
187
188 private byte readPriority(short version, byte currByte)
189 {
190
191
192 switch (version)
193 {
194 case SPDY.V2:
195 int p2 = currByte & 0b1100_0000;
196 p2 >>>= 6;
197 return (byte)p2;
198 case SPDY.V3:
199 int p3 = currByte & 0b1110_0000;
200 p3 >>>= 5;
201 return (byte)p3;
202 default:
203 throw new IllegalStateException();
204 }
205 }
206
207 private void reset()
208 {
209 headers.clear();
210 state = State.STREAM_ID;
211 cursor = 0;
212 streamId = 0;
213 associatedStreamId = 0;
214 priority = 0;
215 }
216
217 private enum State
218 {
219 STREAM_ID, STREAM_ID_BYTES, ASSOCIATED_STREAM_ID, ASSOCIATED_STREAM_ID_BYTES, PRIORITY, HEADERS
220 }
221
222 private class SynStreamHeadersBlockParser extends HeadersBlockParser
223 {
224 public SynStreamHeadersBlockParser(CompressionFactory.Decompressor decompressor)
225 {
226 super(decompressor);
227 }
228
229 @Override
230 protected void onHeader(String name, String[] values)
231 {
232 for (String value : values)
233 headers.add(name, value);
234 }
235 }
236 }