1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common.extensions.mux;
20
21 import java.nio.ByteBuffer;
22
23 import org.eclipse.jetty.util.BufferUtil;
24 import org.eclipse.jetty.util.log.Log;
25 import org.eclipse.jetty.util.log.Logger;
26 import org.eclipse.jetty.websocket.api.extensions.Frame;
27 import org.eclipse.jetty.websocket.common.OpCode;
28 import org.eclipse.jetty.websocket.common.WebSocketFrame;
29 import org.eclipse.jetty.websocket.common.extensions.mux.op.MuxAddChannelRequest;
30 import org.eclipse.jetty.websocket.common.extensions.mux.op.MuxAddChannelResponse;
31 import org.eclipse.jetty.websocket.common.extensions.mux.op.MuxDropChannel;
32 import org.eclipse.jetty.websocket.common.extensions.mux.op.MuxFlowControl;
33 import org.eclipse.jetty.websocket.common.extensions.mux.op.MuxNewChannelSlot;
34
35 public class MuxParser
36 {
37 public static interface Listener
38 {
39 public void onMuxAddChannelRequest(MuxAddChannelRequest request);
40
41 public void onMuxAddChannelResponse(MuxAddChannelResponse response);
42
43 public void onMuxDropChannel(MuxDropChannel drop);
44
45 public void onMuxedFrame(MuxedFrame frame);
46
47 public void onMuxException(MuxException e);
48
49 public void onMuxFlowControl(MuxFlowControl flow);
50
51 public void onMuxNewChannelSlot(MuxNewChannelSlot slot);
52 }
53
54 private final static Logger LOG = Log.getLogger(MuxParser.class);
55
56 private MuxedFrame muxframe = new MuxedFrame();
57 private MuxParser.Listener events;
58 private long channelId;
59
60 public MuxParser.Listener getEvents()
61 {
62 return events;
63 }
64
65
66
67
68
69
70
71 public synchronized void parse(Frame frame)
72 {
73 if (events == null)
74 {
75 throw new RuntimeException("No " + MuxParser.Listener.class + " specified");
76 }
77
78 if (!frame.hasPayload())
79 {
80 LOG.debug("No payload data, skipping");
81 return;
82 }
83
84 if (frame.getType().getOpCode() != OpCode.BINARY)
85 {
86 LOG.debug("Not a binary opcode (base frame), skipping");
87 return;
88 }
89
90 LOG.debug("Parsing Mux Payload of {}",frame);
91
92 try
93 {
94 ByteBuffer buffer = frame.getPayload().slice();
95
96 if (buffer.remaining() <= 0)
97 {
98 return;
99 }
100
101 if (frame.isContinuation())
102 {
103 muxframe.reset();
104 muxframe.setFin(frame.isFin());
105 muxframe.setFin(frame.isRsv1());
106 muxframe.setFin(frame.isRsv2());
107 muxframe.setFin(frame.isRsv3());
108 muxframe.setContinuation(true);
109 parseDataFramePayload(buffer);
110 }
111 else
112 {
113
114 channelId = readChannelId(buffer);
115 if (channelId == 0)
116 {
117 parseControlBlocks(buffer);
118 }
119 else
120 {
121 parseDataFrame(buffer);
122 }
123 }
124 }
125 catch (MuxException e)
126 {
127 events.onMuxException(e);
128 }
129 catch (Throwable t)
130 {
131 events.onMuxException(new MuxException(t));
132 }
133 }
134
135 private void parseControlBlocks(ByteBuffer buffer)
136 {
137
138 while (buffer.remaining() > 0)
139 {
140 byte b = buffer.get();
141 byte opc = (byte)((byte)(b >> 5) & 0xFF);
142 b = (byte)(b & 0x1F);
143
144 try {
145 switch (opc)
146 {
147 case MuxOp.ADD_CHANNEL_REQUEST:
148 {
149 MuxAddChannelRequest op = new MuxAddChannelRequest();
150 op.setRsv((byte)((b & 0x1C) >> 2));
151 op.setEncoding((byte)(b & 0x03));
152 op.setChannelId(readChannelId(buffer));
153 long handshakeSize = read139EncodedSize(buffer);
154 op.setHandshake(readBlock(buffer,handshakeSize));
155 events.onMuxAddChannelRequest(op);
156 break;
157 }
158 case MuxOp.ADD_CHANNEL_RESPONSE:
159 {
160 MuxAddChannelResponse op = new MuxAddChannelResponse();
161 op.setFailed((b & 0x10) != 0);
162 op.setRsv((byte)((byte)(b & 0x0C) >> 2));
163 op.setEncoding((byte)(b & 0x03));
164 op.setChannelId(readChannelId(buffer));
165 long handshakeSize = read139EncodedSize(buffer);
166 op.setHandshake(readBlock(buffer,handshakeSize));
167 events.onMuxAddChannelResponse(op);
168 break;
169 }
170 case MuxOp.DROP_CHANNEL:
171 {
172 int rsv = (b & 0x1F);
173 long channelId = readChannelId(buffer);
174 long reasonSize = read139EncodedSize(buffer);
175 ByteBuffer reasonBuf = readBlock(buffer,reasonSize);
176 MuxDropChannel op = MuxDropChannel.parse(channelId,reasonBuf);
177 op.setRsv(rsv);
178 events.onMuxDropChannel(op);
179 break;
180 }
181 case MuxOp.FLOW_CONTROL:
182 {
183 MuxFlowControl op = new MuxFlowControl();
184 op.setRsv((byte)(b & 0x1F));
185 op.setChannelId(readChannelId(buffer));
186 op.setSendQuotaSize(read139EncodedSize(buffer));
187 events.onMuxFlowControl(op);
188 break;
189 }
190 case MuxOp.NEW_CHANNEL_SLOT:
191 {
192 MuxNewChannelSlot op = new MuxNewChannelSlot();
193 op.setRsv((byte)((b & 0x1E) >> 1));
194 op.setFallback((b & 0x01) != 0);
195 op.setNumberOfSlots(read139EncodedSize(buffer));
196 op.setInitialSendQuota(read139EncodedSize(buffer));
197 events.onMuxNewChannelSlot(op);
198 break;
199 }
200 default:
201 {
202 String err = String.format("Unknown Mux Control Code OPC [0x%X]",opc);
203 throw new MuxException(err);
204 }
205 }
206 }
207 catch (Throwable t)
208 {
209 LOG.warn(t);
210 throw new MuxException(t);
211 }
212 }
213 }
214
215 private void parseDataFrame(ByteBuffer buffer)
216 {
217 byte b = buffer.get();
218 boolean fin = ((b & 0x80) != 0);
219 boolean rsv1 = ((b & 0x40) != 0);
220 boolean rsv2 = ((b & 0x20) != 0);
221 boolean rsv3 = ((b & 0x10) != 0);
222 byte opcode = (byte)(b & 0x0F);
223
224 if (opcode == OpCode.CONTINUATION)
225 {
226 muxframe.setContinuation(true);
227 }
228 else
229 {
230 muxframe.reset();
231 muxframe.setOpCode(opcode);
232 }
233
234 muxframe.setChannelId(channelId);
235 muxframe.setFin(fin);
236 muxframe.setRsv1(rsv1);
237 muxframe.setRsv2(rsv2);
238 muxframe.setRsv3(rsv3);
239
240 parseDataFramePayload(buffer);
241 }
242
243 private void parseDataFramePayload(ByteBuffer buffer)
244 {
245 int capacity = buffer.remaining();
246 ByteBuffer payload = ByteBuffer.allocate(capacity);
247 payload.put(buffer);
248 BufferUtil.flipToFlush(payload,0);
249 muxframe.setPayload(payload);
250 try
251 {
252 LOG.debug("notifyFrame() - {}",muxframe);
253 events.onMuxedFrame(muxframe);
254 }
255 catch (Throwable t)
256 {
257 LOG.warn(t);
258 }
259 }
260
261
262
263
264
265
266
267
268
269
270
271 public long read139EncodedSize(ByteBuffer buffer)
272 {
273 long ret = -1;
274 long minValue = 0x00;
275 int cursor = 0;
276
277 byte b = buffer.get();
278 ret = (b & 0x7F);
279
280 if (ret == 0x7F)
281 {
282
283 ret = 0;
284 minValue = 0xFF_FF;
285 cursor = 8;
286 }
287 else if (ret == 0x7E)
288 {
289
290 ret = 0;
291 minValue = 0x7F;
292 cursor = 2;
293 }
294 else
295 {
296
297
298 return ret;
299 }
300
301
302 while (cursor > 0)
303 {
304 ret = ret << 8;
305 b = buffer.get();
306 ret |= (b & 0xFF);
307 --cursor;
308 }
309
310
311 if (ret <= minValue)
312 {
313 String err = String.format("Invalid 1/3/9 length 0x%X (minimum value for chosen encoding is 0x%X)",ret,minValue);
314 throw new MuxException(err);
315 }
316
317 return ret;
318 }
319
320 private ByteBuffer readBlock(ByteBuffer buffer, long size)
321 {
322 if (size == 0)
323 {
324 return null;
325 }
326
327 if (size > buffer.remaining())
328 {
329 String err = String.format("Truncated data, expected %,d byte(s), but only %,d byte(s) remain",size,buffer.remaining());
330 throw new MuxException(err);
331 }
332
333 if (size > Integer.MAX_VALUE)
334 {
335 String err = String.format("[Int-Sane!] Buffer size %,d is too large to be supported (max allowed is %,d)",size,Integer.MAX_VALUE);
336 throw new MuxException(err);
337 }
338
339 ByteBuffer ret = ByteBuffer.allocate((int)size);
340 BufferUtil.put(buffer,ret);
341 BufferUtil.flipToFlush(ret,0);
342 return ret;
343 }
344
345
346
347
348
349
350
351
352
353
354 public long readChannelId(ByteBuffer buffer)
355 {
356 long id = -1;
357 long minValue = 0x00;
358 byte b = buffer.get();
359 int cursor = -1;
360 if ((b & 0x80) == 0)
361 {
362
363
364 return (b & 0x7F);
365 }
366 else if ((b & 0x40) == 0)
367 {
368
369 id = (b & 0x3F);
370 minValue = 0x7F;
371 cursor = 1;
372 }
373 else if ((b & 0x20) == 0)
374 {
375
376 id = (b & 0x1F);
377 minValue = 0x3F_FF;
378 cursor = 2;
379 }
380 else
381 {
382
383 id = (b & 0x1F);
384 minValue = 0x1F_FF_FF;
385 cursor = 3;
386 }
387
388 while (cursor > 0)
389 {
390 id = id << 8;
391 b = buffer.get();
392 id |= (b & 0xFF);
393 --cursor;
394 }
395
396
397 if (id <= minValue)
398 {
399 String err = String.format("Invalid Channel ID 0x%X (minimum value for chosen encoding is 0x%X)",id,minValue);
400 throw new MuxException(err);
401 }
402
403 return id;
404 }
405
406 public void setEvents(MuxParser.Listener events)
407 {
408 this.events = events;
409 }
410 }