1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket.common;
20
21 import java.nio.ByteBuffer;
22 import java.util.List;
23
24 import org.eclipse.jetty.io.ByteBufferPool;
25 import org.eclipse.jetty.util.BufferUtil;
26 import org.eclipse.jetty.util.log.Log;
27 import org.eclipse.jetty.util.log.Logger;
28 import org.eclipse.jetty.websocket.api.MessageTooLargeException;
29 import org.eclipse.jetty.websocket.api.ProtocolException;
30 import org.eclipse.jetty.websocket.api.WebSocketBehavior;
31 import org.eclipse.jetty.websocket.api.WebSocketException;
32 import org.eclipse.jetty.websocket.api.WebSocketPolicy;
33 import org.eclipse.jetty.websocket.api.extensions.Extension;
34 import org.eclipse.jetty.websocket.api.extensions.Frame;
35 import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
36 import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
37 import org.eclipse.jetty.websocket.common.frames.CloseFrame;
38 import org.eclipse.jetty.websocket.common.frames.ContinuationFrame;
39 import org.eclipse.jetty.websocket.common.frames.ControlFrame;
40 import org.eclipse.jetty.websocket.common.frames.PingFrame;
41 import org.eclipse.jetty.websocket.common.frames.PongFrame;
42 import org.eclipse.jetty.websocket.common.frames.TextFrame;
43 import org.eclipse.jetty.websocket.common.io.payload.DeMaskProcessor;
44 import org.eclipse.jetty.websocket.common.io.payload.PayloadProcessor;
45
46
47
48
49 public class Parser
50 {
51 private enum State
52 {
53 START,
54 PAYLOAD_LEN,
55 PAYLOAD_LEN_BYTES,
56 MASK,
57 MASK_BYTES,
58 PAYLOAD
59 }
60
61 private static final Logger LOG = Log.getLogger(Parser.class);
62 private final WebSocketPolicy policy;
63 private final ByteBufferPool bufferPool;
64
65
66 private State state = State.START;
67 private int cursor = 0;
68
69 private WebSocketFrame frame;
70 private boolean priorDataFrame;
71
72 private ByteBuffer payload;
73 private int payloadLength;
74 private PayloadProcessor maskProcessor = new DeMaskProcessor();
75
76
77
78
79
80
81
82
83
84
85
86
87 private byte flagsInUse=0x00;
88
89 private IncomingFrames incomingFramesHandler;
90
91 public Parser(WebSocketPolicy wspolicy, ByteBufferPool bufferPool)
92 {
93 this.bufferPool = bufferPool;
94 this.policy = wspolicy;
95 }
96
97 private void assertSanePayloadLength(long len)
98 {
99 if (LOG.isDebugEnabled()) {
100 LOG.debug("{} Payload Length: {} - {}",policy.getBehavior(),len,this);
101 }
102
103
104 if (len > Integer.MAX_VALUE)
105 {
106
107 throw new MessageTooLargeException("[int-sane!] cannot handle payload lengths larger than " + Integer.MAX_VALUE);
108 }
109
110 switch (frame.getOpCode())
111 {
112 case OpCode.CLOSE:
113 if (len == 1)
114 {
115 throw new ProtocolException("Invalid close frame payload length, [" + payloadLength + "]");
116 }
117
118 case OpCode.PING:
119 case OpCode.PONG:
120 if (len > ControlFrame.MAX_CONTROL_PAYLOAD)
121 {
122 throw new ProtocolException("Invalid control frame payload length, [" + payloadLength + "] cannot exceed ["
123 + ControlFrame.MAX_CONTROL_PAYLOAD + "]");
124 }
125 break;
126 case OpCode.TEXT:
127 policy.assertValidTextMessageSize((int)len);
128 break;
129 case OpCode.BINARY:
130 policy.assertValidBinaryMessageSize((int)len);
131 break;
132 }
133 }
134
135 public void configureFromExtensions(List<? extends Extension> exts)
136 {
137
138 flagsInUse = 0x00;
139
140
141 for (Extension ext : exts)
142 {
143 if (ext.isRsv1User())
144 {
145 flagsInUse = (byte)(flagsInUse | 0x40);
146 }
147 if (ext.isRsv2User())
148 {
149 flagsInUse = (byte)(flagsInUse | 0x20);
150 }
151 if (ext.isRsv3User())
152 {
153 flagsInUse = (byte)(flagsInUse | 0x10);
154 }
155 }
156 }
157
158 public IncomingFrames getIncomingFramesHandler()
159 {
160 return incomingFramesHandler;
161 }
162
163 public WebSocketPolicy getPolicy()
164 {
165 return policy;
166 }
167
168 public boolean isRsv1InUse()
169 {
170 return (flagsInUse & 0x40) != 0;
171 }
172
173 public boolean isRsv2InUse()
174 {
175 return (flagsInUse & 0x20) != 0;
176 }
177
178 public boolean isRsv3InUse()
179 {
180 return (flagsInUse & 0x10) != 0;
181 }
182
183 protected void notifyFrame(final Frame f)
184 {
185 if (LOG.isDebugEnabled())
186 LOG.debug("{} Notify {}",policy.getBehavior(),getIncomingFramesHandler());
187
188 if (policy.getBehavior() == WebSocketBehavior.SERVER)
189 {
190
191
192
193
194
195
196
197
198
199
200 if (!f.isMasked())
201 {
202 throw new ProtocolException("Client MUST mask all frames (RFC-6455: Section 5.1)");
203 }
204 }
205 else if(policy.getBehavior() == WebSocketBehavior.CLIENT)
206 {
207
208 if (f.isMasked())
209 {
210 throw new ProtocolException("Server MUST NOT mask any frames (RFC-6455: Section 5.1)");
211 }
212 }
213
214 if (incomingFramesHandler == null)
215 {
216 return;
217 }
218 try
219 {
220 incomingFramesHandler.incomingFrame(f);
221 }
222 catch (WebSocketException e)
223 {
224 notifyWebSocketException(e);
225 }
226 catch (Throwable t)
227 {
228 LOG.warn(t);
229 notifyWebSocketException(new WebSocketException(t));
230 }
231 }
232
233 protected void notifyWebSocketException(WebSocketException e)
234 {
235 LOG.warn(e);
236 if (incomingFramesHandler == null)
237 {
238 return;
239 }
240 incomingFramesHandler.incomingError(e);
241 }
242
243 public void parse(ByteBuffer buffer) throws WebSocketException
244 {
245 if (buffer.remaining() <= 0)
246 {
247 return;
248 }
249 try
250 {
251
252 while (parseFrame(buffer))
253 {
254 if (LOG.isDebugEnabled())
255 LOG.debug("{} Parsed Frame: {}",policy.getBehavior(),frame);
256 notifyFrame(frame);
257 if (frame.isDataFrame())
258 {
259 priorDataFrame = !frame.isFin();
260 }
261 reset();
262 }
263 }
264 catch (WebSocketException e)
265 {
266 buffer.position(buffer.limit());
267 reset();
268
269 notifyWebSocketException(e);
270
271 throw e;
272 }
273 catch (Throwable t)
274 {
275 buffer.position(buffer.limit());
276 reset();
277
278 WebSocketException e = new WebSocketException(t);
279 notifyWebSocketException(e);
280
281 throw e;
282 }
283 }
284
285 private void reset()
286 {
287 if (frame != null)
288 frame.reset();
289 frame = null;
290 bufferPool.release(payload);
291 payload = null;
292 }
293
294
295
296
297
298
299
300
301
302
303
304
305 private boolean parseFrame(ByteBuffer buffer)
306 {
307 if (LOG.isDebugEnabled())
308 {
309 LOG.debug("{} Parsing {} bytes",policy.getBehavior(),buffer.remaining());
310 }
311 while (buffer.hasRemaining())
312 {
313 switch (state)
314 {
315 case START:
316 {
317
318 byte b = buffer.get();
319 boolean fin = ((b & 0x80) != 0);
320
321 byte opcode = (byte)(b & 0x0F);
322
323 if (!OpCode.isKnown(opcode))
324 {
325 throw new ProtocolException("Unknown opcode: " + opcode);
326 }
327
328 if (LOG.isDebugEnabled())
329 LOG.debug("{} OpCode {}, fin={} rsv={}{}{}",
330 policy.getBehavior(),
331 OpCode.name(opcode),
332 fin,
333 (((b & 0x40) != 0)?'1':'.'),
334 (((b & 0x20) != 0)?'1':'.'),
335 (((b & 0x10) != 0)?'1':'.'));
336
337
338 switch(opcode)
339 {
340 case OpCode.TEXT:
341 frame = new TextFrame();
342
343 if (priorDataFrame)
344 {
345 throw new ProtocolException("Unexpected " + OpCode.name(opcode) + " frame, was expecting CONTINUATION");
346 }
347 break;
348 case OpCode.BINARY:
349 frame = new BinaryFrame();
350
351 if (priorDataFrame)
352 {
353 throw new ProtocolException("Unexpected " + OpCode.name(opcode) + " frame, was expecting CONTINUATION");
354 }
355 break;
356 case OpCode.CONTINUATION:
357 frame = new ContinuationFrame();
358
359 if (!priorDataFrame)
360 {
361 throw new ProtocolException("CONTINUATION frame without prior !FIN");
362 }
363
364 break;
365 case OpCode.CLOSE:
366 frame = new CloseFrame();
367
368 if (!fin)
369 {
370 throw new ProtocolException("Fragmented Close Frame [" + OpCode.name(opcode) + "]");
371 }
372 break;
373 case OpCode.PING:
374 frame = new PingFrame();
375
376 if (!fin)
377 {
378 throw new ProtocolException("Fragmented Ping Frame [" + OpCode.name(opcode) + "]");
379 }
380 break;
381 case OpCode.PONG:
382 frame = new PongFrame();
383
384 if (!fin)
385 {
386 throw new ProtocolException("Fragmented Pong Frame [" + OpCode.name(opcode) + "]");
387 }
388 break;
389 }
390
391 frame.setFin(fin);
392
393
394 if ((b & 0x70) != 0)
395 {
396
397
398
399
400
401
402 if ((b & 0x40) != 0)
403 {
404 if (isRsv1InUse())
405 frame.setRsv1(true);
406 else
407 throw new ProtocolException("RSV1 not allowed to be set");
408 }
409 if ((b & 0x20) != 0)
410 {
411 if (isRsv2InUse())
412 frame.setRsv2(true);
413 else
414 throw new ProtocolException("RSV2 not allowed to be set");
415 }
416 if ((b & 0x10) != 0)
417 {
418 if (isRsv3InUse())
419 frame.setRsv3(true);
420 else
421 throw new ProtocolException("RSV3 not allowed to be set");
422 }
423 }
424
425 state = State.PAYLOAD_LEN;
426 break;
427 }
428
429 case PAYLOAD_LEN:
430 {
431 byte b = buffer.get();
432 frame.setMasked((b & 0x80) != 0);
433 payloadLength = (byte)(0x7F & b);
434
435 if (payloadLength == 127)
436 {
437
438 payloadLength = 0;
439 state = State.PAYLOAD_LEN_BYTES;
440 cursor = 8;
441 break;
442 }
443 else if (payloadLength == 126)
444 {
445
446 payloadLength = 0;
447 state = State.PAYLOAD_LEN_BYTES;
448 cursor = 2;
449 break;
450 }
451
452 assertSanePayloadLength(payloadLength);
453 if (frame.isMasked())
454 {
455 state = State.MASK;
456 }
457 else
458 {
459
460 if (payloadLength == 0)
461 {
462 state = State.START;
463 return true;
464 }
465
466 maskProcessor.reset(frame);
467 state = State.PAYLOAD;
468 }
469
470 break;
471 }
472
473 case PAYLOAD_LEN_BYTES:
474 {
475 byte b = buffer.get();
476 --cursor;
477 payloadLength |= (b & 0xFF) << (8 * cursor);
478 if (cursor == 0)
479 {
480 assertSanePayloadLength(payloadLength);
481 if (frame.isMasked())
482 {
483 state = State.MASK;
484 }
485 else
486 {
487
488 if (payloadLength == 0)
489 {
490 state = State.START;
491 return true;
492 }
493
494 maskProcessor.reset(frame);
495 state = State.PAYLOAD;
496 }
497 }
498 break;
499 }
500
501 case MASK:
502 {
503 byte m[] = new byte[4];
504 frame.setMask(m);
505 if (buffer.remaining() >= 4)
506 {
507 buffer.get(m,0,4);
508
509 if (payloadLength == 0)
510 {
511 state = State.START;
512 return true;
513 }
514
515 maskProcessor.reset(frame);
516 state = State.PAYLOAD;
517 }
518 else
519 {
520 state = State.MASK_BYTES;
521 cursor = 4;
522 }
523 break;
524 }
525
526 case MASK_BYTES:
527 {
528 byte b = buffer.get();
529 frame.getMask()[4 - cursor] = b;
530 --cursor;
531 if (cursor == 0)
532 {
533
534 if (payloadLength == 0)
535 {
536 state = State.START;
537 return true;
538 }
539
540 maskProcessor.reset(frame);
541 state = State.PAYLOAD;
542 }
543 break;
544 }
545
546 case PAYLOAD:
547 {
548 frame.assertValid();
549 if (parsePayload(buffer))
550 {
551
552 if (frame.getOpCode() == OpCode.CLOSE)
553 {
554
555 new CloseInfo(frame);
556 }
557 state = State.START;
558
559 return true;
560 }
561 break;
562 }
563 }
564 }
565
566 return false;
567 }
568
569
570
571
572
573
574
575
576 private boolean parsePayload(ByteBuffer buffer)
577 {
578 if (payloadLength == 0)
579 {
580 return true;
581 }
582
583 if (buffer.hasRemaining())
584 {
585
586
587
588 int bytesSoFar = payload == null ? 0 : payload.position();
589 int bytesExpected = payloadLength - bytesSoFar;
590 int bytesAvailable = buffer.remaining();
591 int windowBytes = Math.min(bytesAvailable, bytesExpected);
592 int limit = buffer.limit();
593 buffer.limit(buffer.position() + windowBytes);
594 ByteBuffer window = buffer.slice();
595 buffer.limit(limit);
596 buffer.position(buffer.position() + window.remaining());
597
598 if (LOG.isDebugEnabled()) {
599 LOG.debug("{} Window: {}",policy.getBehavior(),BufferUtil.toDetailString(window));
600 }
601
602 maskProcessor.process(window);
603
604 if (window.remaining() == payloadLength)
605 {
606
607 frame.setPayload(window);
608 return true;
609 }
610 else
611 {
612 if (payload == null)
613 {
614 payload = bufferPool.acquire(payloadLength,false);
615 BufferUtil.clearToFill(payload);
616 }
617
618 payload.put(window);
619
620 if (payload.position() == payloadLength)
621 {
622 BufferUtil.flipToFlush(payload, 0);
623 frame.setPayload(payload);
624 return true;
625 }
626 }
627 }
628 return false;
629 }
630
631 public void setIncomingFramesHandler(IncomingFrames incoming)
632 {
633 this.incomingFramesHandler = incoming;
634 }
635
636 @Override
637 public String toString()
638 {
639 StringBuilder builder = new StringBuilder();
640 builder.append("Parser@").append(Integer.toHexString(hashCode()));
641 builder.append("[");
642 if (incomingFramesHandler == null)
643 {
644 builder.append("NO_HANDLER");
645 }
646 else
647 {
648 builder.append(incomingFramesHandler.getClass().getSimpleName());
649 }
650 builder.append(",s=").append(state);
651 builder.append(",c=").append(cursor);
652 builder.append(",len=").append(payloadLength);
653 builder.append(",f=").append(frame);
654
655 builder.append("]");
656 return builder.toString();
657 }
658 }