1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.eclipse.jetty.websocket;
20
21 import java.io.IOException;
22 import java.io.UnsupportedEncodingException;
23 import java.security.MessageDigest;
24 import java.util.Collections;
25 import java.util.List;
26
27 import org.eclipse.jetty.io.AbstractConnection;
28 import org.eclipse.jetty.io.AsyncEndPoint;
29 import org.eclipse.jetty.io.Buffer;
30 import org.eclipse.jetty.io.ByteArrayBuffer;
31 import org.eclipse.jetty.io.Connection;
32 import org.eclipse.jetty.io.EndPoint;
33 import org.eclipse.jetty.util.B64Code;
34 import org.eclipse.jetty.util.StringUtil;
35 import org.eclipse.jetty.util.Utf8StringBuilder;
36 import org.eclipse.jetty.util.log.Log;
37 import org.eclipse.jetty.util.log.Logger;
38 import org.eclipse.jetty.websocket.WebSocket.OnBinaryMessage;
39 import org.eclipse.jetty.websocket.WebSocket.OnControl;
40 import org.eclipse.jetty.websocket.WebSocket.OnFrame;
41 import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
42
43 public class WebSocketConnectionD08 extends AbstractConnection implements WebSocketConnection
44 {
45 private static final Logger LOG = Log.getLogger(WebSocketConnectionD08.class);
46
47 final static byte OP_CONTINUATION = 0x00;
48 final static byte OP_TEXT = 0x01;
49 final static byte OP_BINARY = 0x02;
50 final static byte OP_EXT_DATA = 0x03;
51
52 final static byte OP_CONTROL = 0x08;
53 final static byte OP_CLOSE = 0x08;
54 final static byte OP_PING = 0x09;
55 final static byte OP_PONG = 0x0A;
56 final static byte OP_EXT_CTRL = 0x0B;
57
58 final static int CLOSE_NORMAL=1000;
59 final static int CLOSE_SHUTDOWN=1001;
60 final static int CLOSE_PROTOCOL=1002;
61 final static int CLOSE_BADDATA=1003;
62 final static int CLOSE_NOCODE=1005;
63 final static int CLOSE_NOCLOSE=1006;
64 final static int CLOSE_NOTUTF8=1007;
65
66 final static int FLAG_FIN=0x8;
67
68 final static int VERSION=8;
69
70 static boolean isLastFrame(byte flags)
71 {
72 return (flags&FLAG_FIN)!=0;
73 }
74
75 static boolean isControlFrame(byte opcode)
76 {
77 return (opcode&OP_CONTROL)!=0;
78 }
79
80 private final static byte[] MAGIC;
81 private final List<Extension> _extensions;
82 private final WebSocketParserD08 _parser;
83 private final WebSocketParser.FrameHandler _inbound;
84 private final WebSocketGeneratorD08 _generator;
85 private final WebSocketGenerator _outbound;
86 private final WebSocket _webSocket;
87 private final OnFrame _onFrame;
88 private final OnBinaryMessage _onBinaryMessage;
89 private final OnTextMessage _onTextMessage;
90 private final OnControl _onControl;
91 private final String _protocol;
92 private final int _draft;
93 private final ClassLoader _context;
94 private volatile int _closeCode;
95 private volatile String _closeMessage;
96 private volatile boolean _closedIn;
97 private volatile boolean _closedOut;
98 private int _maxTextMessageSize=-1;
99 private int _maxBinaryMessageSize=-1;
100
101 static
102 {
103 try
104 {
105 MAGIC="258EAFA5-E914-47DA-95CA-C5AB0DC85B11".getBytes(StringUtil.__ISO_8859_1);
106 }
107 catch (UnsupportedEncodingException e)
108 {
109 throw new RuntimeException(e);
110 }
111 }
112
113 private final WebSocketParser.FrameHandler _frameHandler= new WSFrameHandler();
114 private final WebSocket.FrameConnection _connection = new WSFrameConnection();
115
116
117
118 public WebSocketConnectionD08(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol, List<Extension> extensions,int draft)
119 throws IOException
120 {
121 this(websocket,endpoint,buffers,timestamp,maxIdleTime,protocol,extensions,draft,null);
122 }
123
124
125 public WebSocketConnectionD08(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol, List<Extension> extensions,int draft, MaskGen maskgen)
126 throws IOException
127 {
128 super(endpoint,timestamp);
129
130 _context=Thread.currentThread().getContextClassLoader();
131
132 _draft=draft;
133 _endp.setMaxIdleTime(maxIdleTime);
134
135 _webSocket = websocket;
136 _onFrame=_webSocket instanceof OnFrame ? (OnFrame)_webSocket : null;
137 _onTextMessage=_webSocket instanceof OnTextMessage ? (OnTextMessage)_webSocket : null;
138 _onBinaryMessage=_webSocket instanceof OnBinaryMessage ? (OnBinaryMessage)_webSocket : null;
139 _onControl=_webSocket instanceof OnControl ? (OnControl)_webSocket : null;
140 _generator = new WebSocketGeneratorD08(buffers, _endp,maskgen);
141
142 _extensions=extensions;
143 if (_extensions!=null)
144 {
145 int e=0;
146 for (Extension extension : _extensions)
147 {
148 extension.bind(
149 _connection,
150 e==extensions.size()-1?_frameHandler:extensions.get(e+1),
151 e==0?_generator:extensions.get(e-1));
152 e++;
153 }
154 }
155
156 _outbound=(_extensions==null||_extensions.size()==0)?_generator:extensions.get(extensions.size()-1);
157 _inbound=(_extensions==null||_extensions.size()==0)?_frameHandler:extensions.get(0);
158
159 _parser = new WebSocketParserD08(buffers, endpoint,_inbound,maskgen==null);
160
161 _protocol=protocol;
162
163 }
164
165
166 public WebSocket.Connection getConnection()
167 {
168 return _connection;
169 }
170
171
172 public List<Extension> getExtensions()
173 {
174 if (_extensions==null)
175 return Collections.emptyList();
176
177 return _extensions;
178 }
179
180
181 public Connection handle() throws IOException
182 {
183 Thread current = Thread.currentThread();
184 ClassLoader oldcontext = current.getContextClassLoader();
185 current.setContextClassLoader(_context);
186 try
187 {
188
189 boolean progress=true;
190
191 while (progress)
192 {
193 int flushed=_generator.flushBuffer();
194 int filled=_parser.parseNext();
195
196 progress = flushed>0 || filled>0;
197
198 if (filled<0 || flushed<0)
199 {
200 _endp.close();
201 break;
202 }
203 }
204 }
205 catch(IOException e)
206 {
207 try
208 {
209 _endp.close();
210 }
211 catch(IOException e2)
212 {
213 LOG.ignore(e2);
214 }
215 throw e;
216 }
217 finally
218 {
219 current.setContextClassLoader(oldcontext);
220 _parser.returnBuffer();
221 _generator.returnBuffer();
222 if (_endp.isOpen())
223 {
224 if (_closedIn && _closedOut && _outbound.isBufferEmpty())
225 _endp.close();
226 else if (_endp.isInputShutdown() && !_closedIn)
227 closeIn(CLOSE_NOCLOSE,null);
228 else
229 checkWriteable();
230 }
231 }
232 return this;
233 }
234
235
236 public void onInputShutdown() throws IOException
237 {
238
239 }
240
241
242 public boolean isIdle()
243 {
244 return _parser.isBufferEmpty() && _outbound.isBufferEmpty();
245 }
246
247
248 @Override
249 public void onIdleExpired(long idleForMs)
250 {
251 closeOut(WebSocketConnectionD08.CLOSE_NORMAL,"Idle for "+idleForMs+"ms > "+_endp.getMaxIdleTime()+"ms");
252 }
253
254
255 public boolean isSuspended()
256 {
257 return false;
258 }
259
260
261 public void onClose()
262 {
263 final boolean closed;
264 synchronized (this)
265 {
266 closed=_closeCode==0;
267 if (closed)
268 _closeCode=WebSocketConnectionD08.CLOSE_NOCLOSE;
269 }
270 if (closed)
271 _webSocket.onClose(WebSocketConnectionD08.CLOSE_NOCLOSE,"closed");
272 }
273
274
275 public void closeIn(int code,String message)
276 {
277 LOG.debug("ClosedIn {} {}",this,message);
278
279 final boolean closedOut;
280 final boolean closed;
281 synchronized (this)
282 {
283 closedOut=_closedOut;
284 _closedIn=true;
285 closed=_closeCode==0;
286 if (closed)
287 {
288 _closeCode=code;
289 _closeMessage=message;
290 }
291 }
292
293 try
294 {
295 if (closed)
296 _webSocket.onClose(code,message);
297 }
298 finally
299 {
300 try
301 {
302 if (closedOut)
303 _endp.close();
304 else
305 closeOut(code,message);
306 }
307 catch(IOException e)
308 {
309 LOG.ignore(e);
310 }
311 }
312 }
313
314
315 public void closeOut(int code,String message)
316 {
317 LOG.debug("ClosedOut {} {}",this,message);
318
319 final boolean close;
320 final boolean closed;
321 synchronized (this)
322 {
323 close=_closedIn || _closedOut;
324 _closedOut=true;
325 closed=_closeCode==0;
326 if (closed)
327 {
328 _closeCode=code;
329 _closeMessage=message;
330 }
331 }
332
333 try
334 {
335 if (closed)
336 _webSocket.onClose(code,message);
337 }
338 finally
339 {
340 try
341 {
342 if (close)
343 _endp.close();
344 else
345 {
346 if (code<=0)
347 code=WebSocketConnectionD08.CLOSE_NORMAL;
348 byte[] bytes = ("xx"+(message==null?"":message)).getBytes(StringUtil.__ISO_8859_1);
349 bytes[0]=(byte)(code/0x100);
350 bytes[1]=(byte)(code%0x100);
351 _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD08.OP_CLOSE,bytes,0,bytes.length);
352 }
353 _outbound.flush();
354
355 }
356 catch(IOException e)
357 {
358 LOG.ignore(e);
359 }
360 }
361 }
362
363 public void shutdown()
364 {
365 final WebSocket.Connection connection = _connection;
366 if (connection != null)
367 connection.close(CLOSE_SHUTDOWN, null);
368 }
369
370
371 public void fillBuffersFrom(Buffer buffer)
372 {
373 _parser.fill(buffer);
374 }
375
376
377 private void checkWriteable()
378 {
379 if (!_outbound.isBufferEmpty() && _endp instanceof AsyncEndPoint)
380 {
381 ((AsyncEndPoint)_endp).scheduleWrite();
382 }
383 }
384
385 protected void onFrameHandshake()
386 {
387 if (_onFrame != null)
388 {
389 _onFrame.onHandshake(_connection);
390 }
391 }
392
393 protected void onWebSocketOpen()
394 {
395 _webSocket.onOpen(_connection);
396 }
397
398
399 private class WSFrameConnection implements WebSocket.FrameConnection
400 {
401 volatile boolean _disconnecting;
402
403
404 public void sendMessage(String content) throws IOException
405 {
406 if (_closedOut)
407 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
408 byte[] data = content.getBytes(StringUtil.__UTF8);
409 _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD08.OP_TEXT,data,0,data.length);
410 checkWriteable();
411 }
412
413
414 public void sendMessage(byte[] content, int offset, int length) throws IOException
415 {
416 if (_closedOut)
417 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
418 _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD08.OP_BINARY,content,offset,length);
419 checkWriteable();
420 }
421
422
423 public void sendFrame(byte flags,byte opcode, byte[] content, int offset, int length) throws IOException
424 {
425 if (_closedOut)
426 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
427 _outbound.addFrame(flags,opcode,content,offset,length);
428 checkWriteable();
429 }
430
431
432 public void sendControl(byte ctrl, byte[] data, int offset, int length) throws IOException
433 {
434 if (_closedOut)
435 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
436 _outbound.addFrame((byte)FLAG_FIN,ctrl,data,offset,length);
437 checkWriteable();
438 }
439
440
441 public boolean isMessageComplete(byte flags)
442 {
443 return isLastFrame(flags);
444 }
445
446
447 public boolean isOpen()
448 {
449 return _endp!=null&&_endp.isOpen();
450 }
451
452
453 public void close(int code, String message)
454 {
455 if (_disconnecting)
456 return;
457 _disconnecting=true;
458 WebSocketConnectionD08.this.closeOut(code,message);
459 }
460
461
462 public void setMaxIdleTime(int ms)
463 {
464 try
465 {
466 _endp.setMaxIdleTime(ms);
467 }
468 catch(IOException e)
469 {
470 LOG.warn(e);
471 }
472 }
473
474
475 public void setMaxTextMessageSize(int size)
476 {
477 _maxTextMessageSize=size;
478 }
479
480
481 public void setMaxBinaryMessageSize(int size)
482 {
483 _maxBinaryMessageSize=size;
484 }
485
486
487 public int getMaxIdleTime()
488 {
489 return _endp.getMaxIdleTime();
490 }
491
492
493 public int getMaxTextMessageSize()
494 {
495 return _maxTextMessageSize;
496 }
497
498
499 public int getMaxBinaryMessageSize()
500 {
501 return _maxBinaryMessageSize;
502 }
503
504
505 public String getProtocol()
506 {
507 return _protocol;
508 }
509
510
511 public byte binaryOpcode()
512 {
513 return OP_BINARY;
514 }
515
516
517 public byte textOpcode()
518 {
519 return OP_TEXT;
520 }
521
522
523 public byte continuationOpcode()
524 {
525 return OP_CONTINUATION;
526 }
527
528
529 public byte finMask()
530 {
531 return FLAG_FIN;
532 }
533
534
535 public boolean isControl(byte opcode)
536 {
537 return isControlFrame(opcode);
538 }
539
540
541 public boolean isText(byte opcode)
542 {
543 return opcode==OP_TEXT;
544 }
545
546
547 public boolean isBinary(byte opcode)
548 {
549 return opcode==OP_BINARY;
550 }
551
552
553 public boolean isContinuation(byte opcode)
554 {
555 return opcode==OP_CONTINUATION;
556 }
557
558
559 public boolean isClose(byte opcode)
560 {
561 return opcode==OP_CLOSE;
562 }
563
564
565 public boolean isPing(byte opcode)
566 {
567 return opcode==OP_PING;
568 }
569
570
571 public boolean isPong(byte opcode)
572 {
573 return opcode==OP_PONG;
574 }
575
576
577 public void disconnect()
578 {
579 close();
580 }
581
582
583 public void close()
584 {
585 close(CLOSE_NORMAL,null);
586 }
587
588
589 public void setAllowFrameFragmentation(boolean allowFragmentation)
590 {
591 _parser.setFakeFragments(allowFragmentation);
592 }
593
594
595 public boolean isAllowFrameFragmentation()
596 {
597 return _parser.isFakeFragments();
598 }
599
600
601 @Override
602 public String toString()
603 {
604 return this.getClass().getSimpleName()+"D08@"+_endp.getLocalAddr()+":"+_endp.getLocalPort()+"<->"+_endp.getRemoteAddr()+":"+_endp.getRemotePort();
605 }
606 }
607
608
609
610
611 private class WSFrameHandler implements WebSocketParser.FrameHandler
612 {
613 private final Utf8StringBuilder _utf8 = new Utf8StringBuilder();
614 private ByteArrayBuffer _aggregate;
615 private byte _opcode=-1;
616
617 public void onFrame(final byte flags, final byte opcode, final Buffer buffer)
618 {
619 boolean lastFrame = isLastFrame(flags);
620
621 synchronized(WebSocketConnectionD08.this)
622 {
623
624 if (_closedIn)
625 return;
626 }
627 try
628 {
629 byte[] array=buffer.array();
630
631
632 if (_onFrame!=null)
633 {
634 if (_onFrame.onFrame(flags,opcode,array,buffer.getIndex(),buffer.length()))
635 return;
636 }
637
638 if (_onControl!=null && isControlFrame(opcode))
639 {
640 if (_onControl.onControl(opcode,array,buffer.getIndex(),buffer.length()))
641 return;
642 }
643
644 switch(opcode)
645 {
646 case WebSocketConnectionD08.OP_CONTINUATION:
647 {
648
649 if (_onTextMessage!=null && _opcode==WebSocketConnectionD08.OP_TEXT)
650 {
651 if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
652 {
653
654 if (lastFrame)
655 {
656 _opcode=-1;
657 String msg =_utf8.toString();
658 _utf8.reset();
659 _onTextMessage.onMessage(msg);
660 }
661 }
662 else
663 textMessageTooLarge();
664 }
665
666 if (_opcode>=0 && _connection.getMaxBinaryMessageSize()>=0)
667 {
668 if (checkBinaryMessageSize(_aggregate.length(),buffer.length()))
669 {
670 _aggregate.put(buffer);
671
672
673 if (lastFrame && _onBinaryMessage!=null)
674 {
675 try
676 {
677 _onBinaryMessage.onMessage(_aggregate.array(),_aggregate.getIndex(),_aggregate.length());
678 }
679 finally
680 {
681 _opcode=-1;
682 _aggregate.clear();
683 }
684 }
685 }
686 }
687 break;
688 }
689 case WebSocketConnectionD08.OP_PING:
690 {
691 LOG.debug("PING {}",this);
692 if (!_closedOut)
693 _connection.sendControl(WebSocketConnectionD08.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length());
694 break;
695 }
696
697 case WebSocketConnectionD08.OP_PONG:
698 {
699 LOG.debug("PONG {}",this);
700 break;
701 }
702
703 case WebSocketConnectionD08.OP_CLOSE:
704 {
705 int code=WebSocketConnectionD08.CLOSE_NOCODE;
706 String message=null;
707 if (buffer.length()>=2)
708 {
709 code=buffer.array()[buffer.getIndex()]*0x100+buffer.array()[buffer.getIndex()+1];
710 if (buffer.length()>2)
711 message=new String(buffer.array(),buffer.getIndex()+2,buffer.length()-2,StringUtil.__UTF8);
712 }
713 closeIn(code,message);
714 break;
715 }
716
717 case WebSocketConnectionD08.OP_TEXT:
718 {
719 if(_onTextMessage!=null)
720 {
721 if (_connection.getMaxTextMessageSize()<=0)
722 {
723
724 if (lastFrame)
725 _onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8));
726 else
727 {
728 LOG.warn("Frame discarded. Text aggregation disabled for {}",_endp);
729 _connection.close(WebSocketConnectionD08.CLOSE_BADDATA,"Text frame aggregation disabled");
730 }
731 }
732
733 else if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
734 {
735 if (lastFrame)
736 {
737 String msg =_utf8.toString();
738 _utf8.reset();
739 _onTextMessage.onMessage(msg);
740 }
741 else
742 {
743 _opcode=WebSocketConnectionD08.OP_TEXT;
744 }
745 }
746 else
747 textMessageTooLarge();
748 }
749 break;
750 }
751
752 default:
753 {
754 if (_onBinaryMessage!=null && checkBinaryMessageSize(0,buffer.length()))
755 {
756 if (lastFrame)
757 {
758 _onBinaryMessage.onMessage(array,buffer.getIndex(),buffer.length());
759 }
760 else if (_connection.getMaxBinaryMessageSize()>=0)
761 {
762 _opcode=opcode;
763 if (_aggregate==null)
764 _aggregate=new ByteArrayBuffer(_connection.getMaxBinaryMessageSize());
765 _aggregate.put(buffer);
766 }
767 else
768 {
769 LOG.warn("Frame discarded. Binary aggregation disabed for {}",_endp);
770 _connection.close(WebSocketConnectionD08.CLOSE_BADDATA,"Binary frame aggregation disabled");
771 }
772 }
773 }
774 }
775 }
776 catch(Throwable th)
777 {
778 LOG.warn(th);
779 }
780 }
781
782 private boolean checkBinaryMessageSize(int bufferLen, int length)
783 {
784 int max = _connection.getMaxBinaryMessageSize();
785 if (max>0 && (bufferLen+length)>max)
786 {
787 LOG.warn("Binary message too large > {}B for {}",_connection.getMaxBinaryMessageSize(),_endp);
788 _connection.close(WebSocketConnectionD08.CLOSE_BADDATA,"Message size > "+_connection.getMaxBinaryMessageSize());
789 _opcode=-1;
790 if (_aggregate!=null)
791 _aggregate.clear();
792 return false;
793 }
794 return true;
795 }
796
797 private void textMessageTooLarge()
798 {
799 LOG.warn("Text message too large > {} chars for {}",_connection.getMaxTextMessageSize(),_endp);
800 _connection.close(WebSocketConnectionD08.CLOSE_BADDATA,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
801
802 _opcode=-1;
803 _utf8.reset();
804 }
805
806 public void close(int code,String message)
807 {
808 if (code!=CLOSE_NORMAL)
809 LOG.warn("Close: "+code+" "+message);
810 _connection.close(code,message);
811 }
812
813 @Override
814 public String toString()
815 {
816 return WebSocketConnectionD08.this.toString()+"FH";
817 }
818 }
819
820
821 public static String hashKey(String key)
822 {
823 try
824 {
825 MessageDigest md = MessageDigest.getInstance("SHA1");
826 md.update(key.getBytes("UTF-8"));
827 md.update(MAGIC);
828 return new String(B64Code.encode(md.digest()));
829 }
830 catch (Exception e)
831 {
832 throw new RuntimeException(e);
833 }
834 }
835
836
837 @Override
838 public String toString()
839 {
840 return String.format("WS/D%d p=%s g=%s", _draft, _parser, _generator);
841 }
842 }