1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common;
20
21 import java.nio.ByteBuffer;
22 import java.util.List;
23
24 import org.eclipse.jetty.io.ByteBufferPool;
25 import org.eclipse.jetty.util.BufferUtil;
26 import org.eclipse.jetty.util.log.Log;
27 import org.eclipse.jetty.util.log.Logger;
28 import org.eclipse.jetty.websocket.api.MessageTooLargeException;
29 import org.eclipse.jetty.websocket.api.ProtocolException;
30 import org.eclipse.jetty.websocket.api.WebSocketBehavior;
31 import org.eclipse.jetty.websocket.api.WebSocketException;
32 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
33 import org.eclipse.jetty.websocket.api.extensions.Extension;
34 import org.eclipse.jetty.websocket.api.extensions.Frame;
35 import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
36 import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
37 import org.eclipse.jetty.websocket.common.frames.CloseFrame;
38 import org.eclipse.jetty.websocket.common.frames.ContinuationFrame;
39 import org.eclipse.jetty.websocket.common.frames.ControlFrame;
40 import org.eclipse.jetty.websocket.common.frames.PingFrame;
41 import org.eclipse.jetty.websocket.common.frames.PongFrame;
42 import org.eclipse.jetty.websocket.common.frames.TextFrame;
43 import org.eclipse.jetty.websocket.common.io.payload.DeMaskProcessor;
44 import org.eclipse.jetty.websocket.common.io.payload.PayloadProcessor;
45
46
47
48
49 public class Parser
50 {
51 private enum State
52 {
53 START,
54 PAYLOAD_LEN,
55 PAYLOAD_LEN_BYTES,
56 MASK,
57 MASK_BYTES,
58 PAYLOAD
59 }
60
61 private static final Logger LOG = Log.getLogger(Parser.class);
62 private final WebSocketPolicy policy;
63 private final ByteBufferPool bufferPool;
64
65
66 private State state = State.START;
67 private int cursor = 0;
68
69 private WebSocketFrame frame;
70 private boolean priorDataFrame;
71
72 private ByteBuffer payload;
73 private int payloadLength;
74 private PayloadProcessor maskProcessor = new DeMaskProcessor();
75
76
77
78
79
80
81
82
83
84
85
86
87 private byte flagsInUse=0x00;
88
89 private IncomingFrames incomingFramesHandler;
90
91 public Parser(WebSocketPolicy wspolicy, ByteBufferPool bufferPool)
92 {
93 this.bufferPool = bufferPool;
94 this.policy = wspolicy;
95 }
96
97 private void assertSanePayloadLength(long len)
98 {
99 if (LOG.isDebugEnabled()) {
100 LOG.debug("{} Payload Length: {} - {}",policy.getBehavior(),len,this);
101 }
102
103
104 if (len > Integer.MAX_VALUE)
105 {
106
107 throw new MessageTooLargeException("[int-sane!] cannot handle payload lengths larger than " + Integer.MAX_VALUE);
108 }
109
110 switch (frame.getOpCode())
111 {
112 case OpCode.CLOSE:
113 if (len == 1)
114 {
115 throw new ProtocolException("Invalid close frame payload length, [" + payloadLength + "]");
116 }
117
118 case OpCode.PING:
119 case OpCode.PONG:
120 if (len > ControlFrame.MAX_CONTROL_PAYLOAD)
121 {
122 throw new ProtocolException("Invalid control frame payload length, [" + payloadLength + "] cannot exceed ["
123 + ControlFrame.MAX_CONTROL_PAYLOAD + "]");
124 }
125 break;
126 case OpCode.TEXT:
127 policy.assertValidTextMessageSize((int)len);
128 break;
129 case OpCode.BINARY:
130 policy.assertValidBinaryMessageSize((int)len);
131 break;
132 }
133 }
134
135 public void configureFromExtensions(List<? extends Extension> exts)
136 {
137
138 flagsInUse = 0x00;
139
140
141 for (Extension ext : exts)
142 {
143 if (ext.isRsv1User())
144 {
145 flagsInUse = (byte)(flagsInUse | 0x40);
146 }
147 if (ext.isRsv2User())
148 {
149 flagsInUse = (byte)(flagsInUse | 0x20);
150 }
151 if (ext.isRsv3User())
152 {
153 flagsInUse = (byte)(flagsInUse | 0x10);
154 }
155 }
156 }
157
158 public IncomingFrames getIncomingFramesHandler()
159 {
160 return incomingFramesHandler;
161 }
162
163 public WebSocketPolicy getPolicy()
164 {
165 return policy;
166 }
167
168 public boolean isRsv1InUse()
169 {
170 return (flagsInUse & 0x40) != 0;
171 }
172
173 public boolean isRsv2InUse()
174 {
175 return (flagsInUse & 0x20) != 0;
176 }
177
178 public boolean isRsv3InUse()
179 {
180 return (flagsInUse & 0x10) != 0;
181 }
182
183 protected void notifyFrame(final Frame f)
184 {
185 if (LOG.isDebugEnabled())
186 LOG.debug("{} Notify {}",policy.getBehavior(),getIncomingFramesHandler());
187
188 if (policy.getBehavior() == WebSocketBehavior.SERVER)
189 {
190
191
192
193
194
195
196
197
198
199
200 if (!f.isMasked())
201 {
202 throw new ProtocolException("Client MUST mask all frames (RFC-6455: Section 5.1)");
203 }
204 }
205 else if(policy.getBehavior() == WebSocketBehavior.CLIENT)
206 {
207
208 if (f.isMasked())
209 {
210 throw new ProtocolException("Server MUST NOT mask any frames (RFC-6455: Section 5.1)");
211 }
212 }
213
214 if (incomingFramesHandler == null)
215 {
216 return;
217 }
218 try
219 {
220 incomingFramesHandler.incomingFrame(f);
221 }
222 catch (WebSocketException e)
223 {
224 notifyWebSocketException(e);
225 }
226 catch (Throwable t)
227 {
228 LOG.warn(t);
229 notifyWebSocketException(new WebSocketException(t));
230 }
231 }
232
233 protected void notifyWebSocketException(WebSocketException e)
234 {
235 LOG.warn(e);
236 if (incomingFramesHandler == null)
237 {
238 return;
239 }
240 incomingFramesHandler.incomingError(e);
241 }
242
243 public void parse(ByteBuffer buffer) throws WebSocketException
244 {
245 if (buffer.remaining() <= 0)
246 {
247 return;
248 }
249 try
250 {
251
252
253
254 while (parseFrame(buffer))
255 {
256 if (LOG.isDebugEnabled())
257 LOG.debug("{} Parsed Frame: {}",policy.getBehavior(),frame);
258 notifyFrame(frame);
259 if (frame.isDataFrame())
260 {
261 priorDataFrame = !frame.isFin();
262 }
263 reset();
264 }
265 }
266 catch (WebSocketException e)
267 {
268 buffer.position(buffer.limit());
269 reset();
270
271 notifyWebSocketException(e);
272
273 throw e;
274 }
275 catch (Throwable t)
276 {
277 buffer.position(buffer.limit());
278 reset();
279
280 WebSocketException e = new WebSocketException(t);
281 notifyWebSocketException(e);
282
283 throw e;
284 }
285 }
286
287 private void reset()
288 {
289 if (frame != null)
290 frame.reset();
291 frame = null;
292 bufferPool.release(payload);
293 payload = null;
294 }
295
296
297
298
299
300
301
302
303
304
305
306
307 private boolean parseFrame(ByteBuffer buffer)
308 {
309 if (LOG.isDebugEnabled())
310 {
311 LOG.debug("{} Parsing {} bytes",policy.getBehavior(),buffer.remaining());
312 }
313 while (buffer.hasRemaining())
314 {
315 switch (state)
316 {
317 case START:
318 {
319
320 byte b = buffer.get();
321 boolean fin = ((b & 0x80) != 0);
322
323 byte opcode = (byte)(b & 0x0F);
324
325 if (!OpCode.isKnown(opcode))
326 {
327 throw new ProtocolException("Unknown opcode: " + opcode);
328 }
329
330 if (LOG.isDebugEnabled())
331 LOG.debug("{} OpCode {}, fin={} rsv={}{}{}",
332 policy.getBehavior(),
333 OpCode.name(opcode),
334 fin,
335 (isRsv1InUse()?'1':'.'),
336 (isRsv2InUse()?'1':'.'),
337 (isRsv3InUse()?'1':'.'));
338
339
340 switch(opcode)
341 {
342 case OpCode.TEXT:
343 frame = new TextFrame();
344
345 if (priorDataFrame)
346 {
347 throw new ProtocolException("Unexpected " + OpCode.name(opcode) + " frame, was expecting CONTINUATION");
348 }
349 break;
350 case OpCode.BINARY:
351 frame = new BinaryFrame();
352
353 if (priorDataFrame)
354 {
355 throw new ProtocolException("Unexpected " + OpCode.name(opcode) + " frame, was expecting CONTINUATION");
356 }
357 break;
358 case OpCode.CONTINUATION:
359 frame = new ContinuationFrame();
360
361 if (!priorDataFrame)
362 {
363 throw new ProtocolException("CONTINUATION frame without prior !FIN");
364 }
365
366 break;
367 case OpCode.CLOSE:
368 frame = new CloseFrame();
369
370 if (!fin)
371 {
372 throw new ProtocolException("Fragmented Close Frame [" + OpCode.name(opcode) + "]");
373 }
374 break;
375 case OpCode.PING:
376 frame = new PingFrame();
377
378 if (!fin)
379 {
380 throw new ProtocolException("Fragmented Ping Frame [" + OpCode.name(opcode) + "]");
381 }
382 break;
383 case OpCode.PONG:
384 frame = new PongFrame();
385
386 if (!fin)
387 {
388 throw new ProtocolException("Fragmented Pong Frame [" + OpCode.name(opcode) + "]");
389 }
390 break;
391 }
392
393 frame.setFin(fin);
394
395
396 if ((b & 0x70) != 0)
397 {
398
399
400
401
402
403
404 if ((b & 0x40) != 0)
405 {
406 if (isRsv1InUse())
407 frame.setRsv1(true);
408 else
409 throw new ProtocolException("RSV1 not allowed to be set");
410 }
411 if ((b & 0x20) != 0)
412 {
413 if (isRsv2InUse())
414 frame.setRsv2(true);
415 else
416 throw new ProtocolException("RSV2 not allowed to be set");
417 }
418 if ((b & 0x10) != 0)
419 {
420 if (isRsv3InUse())
421 frame.setRsv3(true);
422 else
423 throw new ProtocolException("RSV3 not allowed to be set");
424 }
425 }
426
427 state = State.PAYLOAD_LEN;
428 break;
429 }
430
431 case PAYLOAD_LEN:
432 {
433 byte b = buffer.get();
434 frame.setMasked((b & 0x80) != 0);
435 payloadLength = (byte)(0x7F & b);
436
437 if (payloadLength == 127)
438 {
439
440 payloadLength = 0;
441 state = State.PAYLOAD_LEN_BYTES;
442 cursor = 8;
443 break;
444 }
445 else if (payloadLength == 126)
446 {
447
448 payloadLength = 0;
449 state = State.PAYLOAD_LEN_BYTES;
450 cursor = 2;
451 break;
452 }
453
454 assertSanePayloadLength(payloadLength);
455 if (frame.isMasked())
456 {
457 state = State.MASK;
458 }
459 else
460 {
461
462 if (payloadLength == 0)
463 {
464 state = State.START;
465 return true;
466 }
467
468 maskProcessor.reset(frame);
469 state = State.PAYLOAD;
470 }
471
472 break;
473 }
474
475 case PAYLOAD_LEN_BYTES:
476 {
477 byte b = buffer.get();
478 --cursor;
479 payloadLength |= (b & 0xFF) << (8 * cursor);
480 if (cursor == 0)
481 {
482 assertSanePayloadLength(payloadLength);
483 if (frame.isMasked())
484 {
485 state = State.MASK;
486 }
487 else
488 {
489
490 if (payloadLength == 0)
491 {
492 state = State.START;
493 return true;
494 }
495
496 maskProcessor.reset(frame);
497 state = State.PAYLOAD;
498 }
499 }
500 break;
501 }
502
503 case MASK:
504 {
505 byte m[] = new byte[4];
506 frame.setMask(m);
507 if (buffer.remaining() >= 4)
508 {
509 buffer.get(m,0,4);
510
511 if (payloadLength == 0)
512 {
513 state = State.START;
514 return true;
515 }
516
517 maskProcessor.reset(frame);
518 state = State.PAYLOAD;
519 }
520 else
521 {
522 state = State.MASK_BYTES;
523 cursor = 4;
524 }
525 break;
526 }
527
528 case MASK_BYTES:
529 {
530 byte b = buffer.get();
531 frame.getMask()[4 - cursor] = b;
532 --cursor;
533 if (cursor == 0)
534 {
535
536 if (payloadLength == 0)
537 {
538 state = State.START;
539 return true;
540 }
541
542 maskProcessor.reset(frame);
543 state = State.PAYLOAD;
544 }
545 break;
546 }
547
548 case PAYLOAD:
549 {
550 frame.assertValid();
551 if (parsePayload(buffer))
552 {
553
554 if (frame.getOpCode() == OpCode.CLOSE)
555 {
556
557 new CloseInfo(frame);
558 }
559 state = State.START;
560
561 return true;
562 }
563 break;
564 }
565 }
566 }
567
568 return false;
569 }
570
571
572
573
574
575
576
577
578 private boolean parsePayload(ByteBuffer buffer)
579 {
580 if (payloadLength == 0)
581 {
582 return true;
583 }
584
585 if (buffer.hasRemaining())
586 {
587
588
589
590 int bytesSoFar = payload == null ? 0 : payload.position();
591 int bytesExpected = payloadLength - bytesSoFar;
592 int bytesAvailable = buffer.remaining();
593 int windowBytes = Math.min(bytesAvailable, bytesExpected);
594 int limit = buffer.limit();
595 buffer.limit(buffer.position() + windowBytes);
596 ByteBuffer window = buffer.slice();
597 buffer.limit(limit);
598 buffer.position(buffer.position() + window.remaining());
599
600 if (LOG.isDebugEnabled()) {
601 LOG.debug("{} Window: {}",policy.getBehavior(),BufferUtil.toDetailString(window));
602 }
603
604 maskProcessor.process(window);
605
606 if (window.remaining() == payloadLength)
607 {
608
609 frame.setPayload(window);
610 return true;
611 }
612 else
613 {
614 if (payload == null)
615 {
616 payload = bufferPool.acquire(payloadLength,false);
617 BufferUtil.clearToFill(payload);
618 }
619
620 payload.put(window);
621
622 if (payload.position() == payloadLength)
623 {
624 BufferUtil.flipToFlush(payload, 0);
625 frame.setPayload(payload);
626 return true;
627 }
628 }
629 }
630 return false;
631 }
632
633 public void setIncomingFramesHandler(IncomingFrames incoming)
634 {
635 this.incomingFramesHandler = incoming;
636 }
637
638 @Override
639 public String toString()
640 {
641 StringBuilder builder = new StringBuilder();
642 builder.append("Parser@").append(Integer.toHexString(hashCode()));
643 builder.append("[");
644 if (incomingFramesHandler == null)
645 {
646 builder.append("NO_HANDLER");
647 }
648 else
649 {
650 builder.append(incomingFramesHandler.getClass().getSimpleName());
651 }
652 builder.append(",s=").append(state);
653 builder.append(",c=").append(cursor);
654 builder.append(",len=").append(payloadLength);
655 builder.append(",f=").append(frame);
656 builder.append(",p=").append(policy);
657 builder.append("]");
658 return builder.toString();
659 }
660 }