1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.spdy.parser;
15
16 import java.nio.ByteBuffer;
17
18 import org.eclipse.jetty.spdy.CompressionFactory;
19 import org.eclipse.jetty.spdy.PushSynInfo;
20 import org.eclipse.jetty.spdy.StreamException;
21 import org.eclipse.jetty.spdy.api.Headers;
22 import org.eclipse.jetty.spdy.api.SPDY;
23 import org.eclipse.jetty.spdy.api.StreamStatus;
24 import org.eclipse.jetty.spdy.api.SynInfo;
25 import org.eclipse.jetty.spdy.frames.ControlFrameType;
26 import org.eclipse.jetty.spdy.frames.SynStreamFrame;
27
28 public class SynStreamBodyParser extends ControlFrameBodyParser
29 {
30 private final Headers headers = new Headers();
31 private final ControlFrameParser controlFrameParser;
32 private final HeadersBlockParser headersBlockParser;
33 private State state = State.STREAM_ID;
34 private int cursor;
35 private int streamId;
36 private int associatedStreamId;
37 private byte priority;
38 private short slot;
39
40 public SynStreamBodyParser(CompressionFactory.Decompressor decompressor, ControlFrameParser controlFrameParser)
41 {
42 this.controlFrameParser = controlFrameParser;
43 this.headersBlockParser = new SynStreamHeadersBlockParser(decompressor);
44 }
45
46 @Override
47 public boolean parse(ByteBuffer buffer)
48 {
49 while (buffer.hasRemaining())
50 {
51 switch (state)
52 {
53 case STREAM_ID:
54 {
55 if (buffer.remaining() >= 4)
56 {
57 streamId = buffer.getInt() & 0x7F_FF_FF_FF;
58 state = State.ASSOCIATED_STREAM_ID;
59 }
60 else
61 {
62 state = State.STREAM_ID_BYTES;
63 cursor = 4;
64 }
65 break;
66 }
67 case STREAM_ID_BYTES:
68 {
69 byte currByte = buffer.get();
70 --cursor;
71 streamId += (currByte & 0xFF) << 8 * cursor;
72 if (cursor == 0)
73 {
74 streamId &= 0x7F_FF_FF_FF;
75 state = State.ASSOCIATED_STREAM_ID;
76 }
77 break;
78 }
79 case ASSOCIATED_STREAM_ID:
80 {
81
82
83 checkVersion(controlFrameParser.getVersion(), streamId);
84
85 if (buffer.remaining() >= 4)
86 {
87 associatedStreamId = buffer.getInt() & 0x7F_FF_FF_FF;
88 state = State.PRIORITY;
89 }
90 else
91 {
92 state = State.ASSOCIATED_STREAM_ID_BYTES;
93 cursor = 4;
94 }
95 break;
96 }
97 case ASSOCIATED_STREAM_ID_BYTES:
98 {
99 byte currByte = buffer.get();
100 --cursor;
101 associatedStreamId += (currByte & 0xFF) << 8 * cursor;
102 if (cursor == 0)
103 {
104 associatedStreamId &= 0x7F_FF_FF_FF;
105 state = State.PRIORITY;
106 }
107 break;
108 }
109 case PRIORITY:
110 {
111 byte currByte = buffer.get();
112 ++cursor;
113 if (cursor == 1)
114 {
115 priority = readPriority(controlFrameParser.getVersion(), currByte);
116 }
117 else
118 {
119 slot = (short)(currByte & 0xFF);
120 if (slot < 0)
121 throw new StreamException(streamId, StreamStatus.INVALID_CREDENTIALS);
122 cursor = 0;
123 state = State.HEADERS;
124 }
125 break;
126 }
127 case HEADERS:
128 {
129 short version = controlFrameParser.getVersion();
130 int length = controlFrameParser.getLength() - 10;
131 if (headersBlockParser.parse(streamId, version, length, buffer))
132 {
133 byte flags = controlFrameParser.getFlags();
134 if (flags > (SynInfo.FLAG_CLOSE | PushSynInfo.FLAG_UNIDIRECTIONAL))
135 throw new IllegalArgumentException("Invalid flag " + flags + " for frame " + ControlFrameType.SYN_STREAM);
136
137 SynStreamFrame frame = new SynStreamFrame(version, flags, streamId, associatedStreamId, priority, slot, new Headers(headers, true));
138 controlFrameParser.onControlFrame(frame);
139
140 reset();
141 return true;
142 }
143 break;
144 }
145 default:
146 {
147 throw new IllegalStateException();
148 }
149 }
150 }
151 return false;
152 }
153
154 private void checkVersion(short version, int streamId)
155 {
156 if (version != SPDY.V2 && version != SPDY.V3)
157 throw new StreamException(streamId, StreamStatus.UNSUPPORTED_VERSION);
158 }
159
160 private byte readPriority(short version, byte currByte)
161 {
162
163
164 switch (version)
165 {
166 case SPDY.V2:
167 int p2 = currByte & 0b1100_0000;
168 p2 >>>= 6;
169 return (byte)p2;
170 case SPDY.V3:
171 int p3 = currByte & 0b1110_0000;
172 p3 >>>= 5;
173 return (byte)p3;
174 default:
175 throw new IllegalStateException();
176 }
177 }
178
179 private void reset()
180 {
181 headers.clear();
182 state = State.STREAM_ID;
183 cursor = 0;
184 streamId = 0;
185 associatedStreamId = 0;
186 priority = 0;
187 }
188
189 private enum State
190 {
191 STREAM_ID, STREAM_ID_BYTES, ASSOCIATED_STREAM_ID, ASSOCIATED_STREAM_ID_BYTES, PRIORITY, HEADERS
192 }
193
194 private class SynStreamHeadersBlockParser extends HeadersBlockParser
195 {
196 public SynStreamHeadersBlockParser(CompressionFactory.Decompressor decompressor)
197 {
198 super(decompressor);
199 }
200
201 @Override
202 protected void onHeader(String name, String[] values)
203 {
204 for (String value : values)
205 headers.add(name, value);
206 }
207 }
208 }