1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.spdy.parser;
20
21 import java.nio.ByteBuffer;
22
23 import org.eclipse.jetty.spdy.CompressionFactory;
24 import org.eclipse.jetty.spdy.PushSynInfo;
25 import org.eclipse.jetty.spdy.StreamException;
26 import org.eclipse.jetty.spdy.api.Headers;
27 import org.eclipse.jetty.spdy.api.SPDY;
28 import org.eclipse.jetty.spdy.api.StreamStatus;
29 import org.eclipse.jetty.spdy.api.SynInfo;
30 import org.eclipse.jetty.spdy.frames.ControlFrameType;
31 import org.eclipse.jetty.spdy.frames.SynStreamFrame;
32
33 public class SynStreamBodyParser extends ControlFrameBodyParser
34 {
35 private final Headers headers = new Headers();
36 private final ControlFrameParser controlFrameParser;
37 private final HeadersBlockParser headersBlockParser;
38 private State state = State.STREAM_ID;
39 private int cursor;
40 private int streamId;
41 private int associatedStreamId;
42 private byte priority;
43 private short slot;
44
45 public SynStreamBodyParser(CompressionFactory.Decompressor decompressor, ControlFrameParser controlFrameParser)
46 {
47 this.controlFrameParser = controlFrameParser;
48 this.headersBlockParser = new SynStreamHeadersBlockParser(decompressor);
49 }
50
51 @Override
52 public boolean parse(ByteBuffer buffer)
53 {
54 while (buffer.hasRemaining())
55 {
56 switch (state)
57 {
58 case STREAM_ID:
59 {
60 if (buffer.remaining() >= 4)
61 {
62 streamId = buffer.getInt() & 0x7F_FF_FF_FF;
63 state = State.ASSOCIATED_STREAM_ID;
64 }
65 else
66 {
67 state = State.STREAM_ID_BYTES;
68 cursor = 4;
69 }
70 break;
71 }
72 case STREAM_ID_BYTES:
73 {
74 byte currByte = buffer.get();
75 --cursor;
76 streamId += (currByte & 0xFF) << 8 * cursor;
77 if (cursor == 0)
78 {
79 streamId &= 0x7F_FF_FF_FF;
80 state = State.ASSOCIATED_STREAM_ID;
81 }
82 break;
83 }
84 case ASSOCIATED_STREAM_ID:
85 {
86
87
88 checkVersion(controlFrameParser.getVersion(), streamId);
89
90 if (buffer.remaining() >= 4)
91 {
92 associatedStreamId = buffer.getInt() & 0x7F_FF_FF_FF;
93 state = State.PRIORITY;
94 }
95 else
96 {
97 state = State.ASSOCIATED_STREAM_ID_BYTES;
98 cursor = 4;
99 }
100 break;
101 }
102 case ASSOCIATED_STREAM_ID_BYTES:
103 {
104 byte currByte = buffer.get();
105 --cursor;
106 associatedStreamId += (currByte & 0xFF) << 8 * cursor;
107 if (cursor == 0)
108 {
109 associatedStreamId &= 0x7F_FF_FF_FF;
110 state = State.PRIORITY;
111 }
112 break;
113 }
114 case PRIORITY:
115 {
116 byte currByte = buffer.get();
117 ++cursor;
118 if (cursor == 1)
119 {
120 priority = readPriority(controlFrameParser.getVersion(), currByte);
121 }
122 else
123 {
124 slot = (short)(currByte & 0xFF);
125 if (slot < 0)
126 throw new StreamException(streamId, StreamStatus.INVALID_CREDENTIALS);
127 cursor = 0;
128 state = State.HEADERS;
129 }
130 break;
131 }
132 case HEADERS:
133 {
134 short version = controlFrameParser.getVersion();
135 int length = controlFrameParser.getLength() - 10;
136 if (headersBlockParser.parse(streamId, version, length, buffer))
137 {
138 byte flags = controlFrameParser.getFlags();
139 if (flags > (SynInfo.FLAG_CLOSE | PushSynInfo.FLAG_UNIDIRECTIONAL))
140 throw new IllegalArgumentException("Invalid flag " + flags + " for frame " + ControlFrameType.SYN_STREAM);
141
142 SynStreamFrame frame = new SynStreamFrame(version, flags, streamId, associatedStreamId, priority, slot, new Headers(headers, true));
143 controlFrameParser.onControlFrame(frame);
144
145 reset();
146 return true;
147 }
148 break;
149 }
150 default:
151 {
152 throw new IllegalStateException();
153 }
154 }
155 }
156 return false;
157 }
158
159 private void checkVersion(short version, int streamId)
160 {
161 if (version != SPDY.V2 && version != SPDY.V3)
162 throw new StreamException(streamId, StreamStatus.UNSUPPORTED_VERSION);
163 }
164
165 private byte readPriority(short version, byte currByte)
166 {
167
168
169 switch (version)
170 {
171 case SPDY.V2:
172 int p2 = currByte & 0b1100_0000;
173 p2 >>>= 6;
174 return (byte)p2;
175 case SPDY.V3:
176 int p3 = currByte & 0b1110_0000;
177 p3 >>>= 5;
178 return (byte)p3;
179 default:
180 throw new IllegalStateException();
181 }
182 }
183
184 private void reset()
185 {
186 headers.clear();
187 state = State.STREAM_ID;
188 cursor = 0;
189 streamId = 0;
190 associatedStreamId = 0;
191 priority = 0;
192 }
193
194 private enum State
195 {
196 STREAM_ID, STREAM_ID_BYTES, ASSOCIATED_STREAM_ID, ASSOCIATED_STREAM_ID_BYTES, PRIORITY, HEADERS
197 }
198
199 private class SynStreamHeadersBlockParser extends HeadersBlockParser
200 {
201 public SynStreamHeadersBlockParser(CompressionFactory.Decompressor decompressor)
202 {
203 super(decompressor);
204 }
205
206 @Override
207 protected void onHeader(String name, String[] values)
208 {
209 for (String value : values)
210 headers.add(name, value);
211 }
212 }
213 }