1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 package org.eclipse.jetty.websocket;
30
31 import java.io.IOException;
32 import java.io.UnsupportedEncodingException;
33 import java.security.MessageDigest;
34 import java.util.Collections;
35 import java.util.List;
36
37 import org.eclipse.jetty.io.AbstractConnection;
38 import org.eclipse.jetty.io.AsyncEndPoint;
39 import org.eclipse.jetty.io.Buffer;
40 import org.eclipse.jetty.io.ByteArrayBuffer;
41 import org.eclipse.jetty.io.Connection;
42 import org.eclipse.jetty.io.EndPoint;
43 import org.eclipse.jetty.util.B64Code;
44 import org.eclipse.jetty.util.StringUtil;
45 import org.eclipse.jetty.util.Utf8StringBuilder;
46 import org.eclipse.jetty.util.log.Log;
47 import org.eclipse.jetty.util.log.Logger;
48 import org.eclipse.jetty.websocket.WebSocket.OnBinaryMessage;
49 import org.eclipse.jetty.websocket.WebSocket.OnControl;
50 import org.eclipse.jetty.websocket.WebSocket.OnFrame;
51 import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
52
53 public class WebSocketConnectionD08 extends AbstractConnection implements WebSocketConnection
54 {
55 private static final Logger LOG = Log.getLogger(WebSocketConnectionD08.class);
56
57 final static byte OP_CONTINUATION = 0x00;
58 final static byte OP_TEXT = 0x01;
59 final static byte OP_BINARY = 0x02;
60 final static byte OP_EXT_DATA = 0x03;
61
62 final static byte OP_CONTROL = 0x08;
63 final static byte OP_CLOSE = 0x08;
64 final static byte OP_PING = 0x09;
65 final static byte OP_PONG = 0x0A;
66 final static byte OP_EXT_CTRL = 0x0B;
67
68 final static int CLOSE_NORMAL=1000;
69 final static int CLOSE_SHUTDOWN=1001;
70 final static int CLOSE_PROTOCOL=1002;
71 final static int CLOSE_BADDATA=1003;
72 final static int CLOSE_NOCODE=1005;
73 final static int CLOSE_NOCLOSE=1006;
74 final static int CLOSE_NOTUTF8=1007;
75
76 final static int FLAG_FIN=0x8;
77
78 final static int VERSION=8;
79
80 static boolean isLastFrame(byte flags)
81 {
82 return (flags&FLAG_FIN)!=0;
83 }
84
85 static boolean isControlFrame(byte opcode)
86 {
87 return (opcode&OP_CONTROL)!=0;
88 }
89
90 private final static byte[] MAGIC;
91 private final List<Extension> _extensions;
92 private final WebSocketParserD08 _parser;
93 private final WebSocketParser.FrameHandler _inbound;
94 private final WebSocketGeneratorD08 _generator;
95 private final WebSocketGenerator _outbound;
96 private final WebSocket _webSocket;
97 private final OnFrame _onFrame;
98 private final OnBinaryMessage _onBinaryMessage;
99 private final OnTextMessage _onTextMessage;
100 private final OnControl _onControl;
101 private final String _protocol;
102 private final int _draft;
103 private final ClassLoader _context;
104 private volatile int _closeCode;
105 private volatile String _closeMessage;
106 private volatile boolean _closedIn;
107 private volatile boolean _closedOut;
108 private int _maxTextMessageSize=-1;
109 private int _maxBinaryMessageSize=-1;
110
111 static
112 {
113 try
114 {
115 MAGIC="258EAFA5-E914-47DA-95CA-C5AB0DC85B11".getBytes(StringUtil.__ISO_8859_1);
116 }
117 catch (UnsupportedEncodingException e)
118 {
119 throw new RuntimeException(e);
120 }
121 }
122
123 private final WebSocketParser.FrameHandler _frameHandler= new WSFrameHandler();
124 private final WebSocket.FrameConnection _connection = new WSFrameConnection();
125
126
127
128 public WebSocketConnectionD08(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol, List<Extension> extensions,int draft)
129 throws IOException
130 {
131 this(websocket,endpoint,buffers,timestamp,maxIdleTime,protocol,extensions,draft,null);
132 }
133
134
135 public WebSocketConnectionD08(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol, List<Extension> extensions,int draft, MaskGen maskgen)
136 throws IOException
137 {
138 super(endpoint,timestamp);
139
140 _context=Thread.currentThread().getContextClassLoader();
141
142 _draft=draft;
143 _endp.setMaxIdleTime(maxIdleTime);
144
145 _webSocket = websocket;
146 _onFrame=_webSocket instanceof OnFrame ? (OnFrame)_webSocket : null;
147 _onTextMessage=_webSocket instanceof OnTextMessage ? (OnTextMessage)_webSocket : null;
148 _onBinaryMessage=_webSocket instanceof OnBinaryMessage ? (OnBinaryMessage)_webSocket : null;
149 _onControl=_webSocket instanceof OnControl ? (OnControl)_webSocket : null;
150 _generator = new WebSocketGeneratorD08(buffers, _endp,maskgen);
151
152 _extensions=extensions;
153 if (_extensions!=null)
154 {
155 int e=0;
156 for (Extension extension : _extensions)
157 {
158 extension.bind(
159 _connection,
160 e==extensions.size()-1?_frameHandler:extensions.get(e+1),
161 e==0?_generator:extensions.get(e-1));
162 e++;
163 }
164 }
165
166 _outbound=(_extensions==null||_extensions.size()==0)?_generator:extensions.get(extensions.size()-1);
167 _inbound=(_extensions==null||_extensions.size()==0)?_frameHandler:extensions.get(0);
168
169 _parser = new WebSocketParserD08(buffers, endpoint,_inbound,maskgen==null);
170
171 _protocol=protocol;
172
173 }
174
175
176 public WebSocket.Connection getConnection()
177 {
178 return _connection;
179 }
180
181
182 public List<Extension> getExtensions()
183 {
184 if (_extensions==null)
185 return Collections.emptyList();
186
187 return _extensions;
188 }
189
190
191 public Connection handle() throws IOException
192 {
193 Thread current = Thread.currentThread();
194 ClassLoader oldcontext = current.getContextClassLoader();
195 current.setContextClassLoader(_context);
196 try
197 {
198
199 boolean progress=true;
200
201 while (progress)
202 {
203 int flushed=_generator.flushBuffer();
204 int filled=_parser.parseNext();
205
206 progress = flushed>0 || filled>0;
207
208 if (filled<0 || flushed<0)
209 {
210 _endp.close();
211 break;
212 }
213 }
214 }
215 catch(IOException e)
216 {
217 try
218 {
219 _endp.close();
220 }
221 catch(IOException e2)
222 {
223 LOG.ignore(e2);
224 }
225 throw e;
226 }
227 finally
228 {
229 current.setContextClassLoader(oldcontext);
230 _parser.returnBuffer();
231 _generator.returnBuffer();
232 if (_endp.isOpen())
233 {
234 if (_closedIn && _closedOut && _outbound.isBufferEmpty())
235 _endp.close();
236 else if (_endp.isInputShutdown() && !_closedIn)
237 closeIn(CLOSE_NOCLOSE,null);
238 else
239 checkWriteable();
240 }
241 }
242 return this;
243 }
244
245
246 public void onInputShutdown() throws IOException
247 {
248
249 }
250
251
252 public boolean isIdle()
253 {
254 return _parser.isBufferEmpty() && _outbound.isBufferEmpty();
255 }
256
257
258 @Override
259 public void onIdleExpired(long idleForMs)
260 {
261 closeOut(WebSocketConnectionD08.CLOSE_NORMAL,"Idle for "+idleForMs+"ms > "+_endp.getMaxIdleTime()+"ms");
262 }
263
264
265 public boolean isSuspended()
266 {
267 return false;
268 }
269
270
271 public void onClose()
272 {
273 final boolean closed;
274 synchronized (this)
275 {
276 closed=_closeCode==0;
277 if (closed)
278 _closeCode=WebSocketConnectionD08.CLOSE_NOCLOSE;
279 }
280 if (closed)
281 _webSocket.onClose(WebSocketConnectionD08.CLOSE_NOCLOSE,"closed");
282 }
283
284
285 public void closeIn(int code,String message)
286 {
287 LOG.debug("ClosedIn {} {}",this,message);
288
289 final boolean closedOut;
290 final boolean closed;
291 synchronized (this)
292 {
293 closedOut=_closedOut;
294 _closedIn=true;
295 closed=_closeCode==0;
296 if (closed)
297 {
298 _closeCode=code;
299 _closeMessage=message;
300 }
301 }
302
303 try
304 {
305 if (closed)
306 _webSocket.onClose(code,message);
307 }
308 finally
309 {
310 try
311 {
312 if (closedOut)
313 _endp.close();
314 else
315 closeOut(code,message);
316 }
317 catch(IOException e)
318 {
319 LOG.ignore(e);
320 }
321 }
322 }
323
324
325 public void closeOut(int code,String message)
326 {
327 LOG.debug("ClosedOut {} {}",this,message);
328
329 final boolean close;
330 final boolean closed;
331 synchronized (this)
332 {
333 close=_closedIn || _closedOut;
334 _closedOut=true;
335 closed=_closeCode==0;
336 if (closed)
337 {
338 _closeCode=code;
339 _closeMessage=message;
340 }
341 }
342
343 try
344 {
345 if (closed)
346 _webSocket.onClose(code,message);
347 }
348 finally
349 {
350 try
351 {
352 if (close)
353 _endp.close();
354 else
355 {
356 if (code<=0)
357 code=WebSocketConnectionD08.CLOSE_NORMAL;
358 byte[] bytes = ("xx"+(message==null?"":message)).getBytes(StringUtil.__ISO_8859_1);
359 bytes[0]=(byte)(code/0x100);
360 bytes[1]=(byte)(code%0x100);
361 _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD08.OP_CLOSE,bytes,0,bytes.length);
362 }
363 _outbound.flush();
364
365 }
366 catch(IOException e)
367 {
368 LOG.ignore(e);
369 }
370 }
371 }
372
373 public void shutdown()
374 {
375 final WebSocket.Connection connection = _connection;
376 if (connection != null)
377 connection.close(CLOSE_SHUTDOWN, null);
378 }
379
380
381 public void fillBuffersFrom(Buffer buffer)
382 {
383 _parser.fill(buffer);
384 }
385
386
387 private void checkWriteable()
388 {
389 if (!_outbound.isBufferEmpty() && _endp instanceof AsyncEndPoint)
390 {
391 ((AsyncEndPoint)_endp).scheduleWrite();
392 }
393 }
394
395 protected void onFrameHandshake()
396 {
397 if (_onFrame != null)
398 {
399 _onFrame.onHandshake(_connection);
400 }
401 }
402
403 protected void onWebSocketOpen()
404 {
405 _webSocket.onOpen(_connection);
406 }
407
408
409 private class WSFrameConnection implements WebSocket.FrameConnection
410 {
411 volatile boolean _disconnecting;
412
413
414 public void sendMessage(String content) throws IOException
415 {
416 if (_closedOut)
417 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
418 byte[] data = content.getBytes(StringUtil.__UTF8);
419 _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD08.OP_TEXT,data,0,data.length);
420 checkWriteable();
421 }
422
423
424 public void sendMessage(byte[] content, int offset, int length) throws IOException
425 {
426 if (_closedOut)
427 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
428 _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD08.OP_BINARY,content,offset,length);
429 checkWriteable();
430 }
431
432
433 public void sendFrame(byte flags,byte opcode, byte[] content, int offset, int length) throws IOException
434 {
435 if (_closedOut)
436 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
437 _outbound.addFrame(flags,opcode,content,offset,length);
438 checkWriteable();
439 }
440
441
442 public void sendControl(byte ctrl, byte[] data, int offset, int length) throws IOException
443 {
444 if (_closedOut)
445 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
446 _outbound.addFrame((byte)FLAG_FIN,ctrl,data,offset,length);
447 checkWriteable();
448 }
449
450
451 public boolean isMessageComplete(byte flags)
452 {
453 return isLastFrame(flags);
454 }
455
456
457 public boolean isOpen()
458 {
459 return _endp!=null&&_endp.isOpen();
460 }
461
462
463 public void close(int code, String message)
464 {
465 if (_disconnecting)
466 return;
467 _disconnecting=true;
468 WebSocketConnectionD08.this.closeOut(code,message);
469 }
470
471
472 public void setMaxIdleTime(int ms)
473 {
474 try
475 {
476 _endp.setMaxIdleTime(ms);
477 }
478 catch(IOException e)
479 {
480 LOG.warn(e);
481 }
482 }
483
484
485 public void setMaxTextMessageSize(int size)
486 {
487 _maxTextMessageSize=size;
488 }
489
490
491 public void setMaxBinaryMessageSize(int size)
492 {
493 _maxBinaryMessageSize=size;
494 }
495
496
497 public int getMaxIdleTime()
498 {
499 return _endp.getMaxIdleTime();
500 }
501
502
503 public int getMaxTextMessageSize()
504 {
505 return _maxTextMessageSize;
506 }
507
508
509 public int getMaxBinaryMessageSize()
510 {
511 return _maxBinaryMessageSize;
512 }
513
514
515 public String getProtocol()
516 {
517 return _protocol;
518 }
519
520
521 public byte binaryOpcode()
522 {
523 return OP_BINARY;
524 }
525
526
527 public byte textOpcode()
528 {
529 return OP_TEXT;
530 }
531
532
533 public byte continuationOpcode()
534 {
535 return OP_CONTINUATION;
536 }
537
538
539 public byte finMask()
540 {
541 return FLAG_FIN;
542 }
543
544
545 public boolean isControl(byte opcode)
546 {
547 return isControlFrame(opcode);
548 }
549
550
551 public boolean isText(byte opcode)
552 {
553 return opcode==OP_TEXT;
554 }
555
556
557 public boolean isBinary(byte opcode)
558 {
559 return opcode==OP_BINARY;
560 }
561
562
563 public boolean isContinuation(byte opcode)
564 {
565 return opcode==OP_CONTINUATION;
566 }
567
568
569 public boolean isClose(byte opcode)
570 {
571 return opcode==OP_CLOSE;
572 }
573
574
575 public boolean isPing(byte opcode)
576 {
577 return opcode==OP_PING;
578 }
579
580
581 public boolean isPong(byte opcode)
582 {
583 return opcode==OP_PONG;
584 }
585
586
587 public void disconnect()
588 {
589 close();
590 }
591
592
593 public void close()
594 {
595 close(CLOSE_NORMAL,null);
596 }
597
598
599 public void setAllowFrameFragmentation(boolean allowFragmentation)
600 {
601 _parser.setFakeFragments(allowFragmentation);
602 }
603
604
605 public boolean isAllowFrameFragmentation()
606 {
607 return _parser.isFakeFragments();
608 }
609
610
611 @Override
612 public String toString()
613 {
614 return this.getClass().getSimpleName()+"D08@"+_endp.getLocalAddr()+":"+_endp.getLocalPort()+"<->"+_endp.getRemoteAddr()+":"+_endp.getRemotePort();
615 }
616 }
617
618
619
620
621 private class WSFrameHandler implements WebSocketParser.FrameHandler
622 {
623 private final Utf8StringBuilder _utf8 = new Utf8StringBuilder();
624 private ByteArrayBuffer _aggregate;
625 private byte _opcode=-1;
626
627 public void onFrame(final byte flags, final byte opcode, final Buffer buffer)
628 {
629 boolean lastFrame = isLastFrame(flags);
630
631 synchronized(WebSocketConnectionD08.this)
632 {
633
634 if (_closedIn)
635 return;
636 }
637 try
638 {
639 byte[] array=buffer.array();
640
641
642 if (_onFrame!=null)
643 {
644 if (_onFrame.onFrame(flags,opcode,array,buffer.getIndex(),buffer.length()))
645 return;
646 }
647
648 if (_onControl!=null && isControlFrame(opcode))
649 {
650 if (_onControl.onControl(opcode,array,buffer.getIndex(),buffer.length()))
651 return;
652 }
653
654 switch(opcode)
655 {
656 case WebSocketConnectionD08.OP_CONTINUATION:
657 {
658
659 if (_onTextMessage!=null && _opcode==WebSocketConnectionD08.OP_TEXT)
660 {
661 if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
662 {
663
664 if (lastFrame)
665 {
666 _opcode=-1;
667 String msg =_utf8.toString();
668 _utf8.reset();
669 _onTextMessage.onMessage(msg);
670 }
671 }
672 else
673 textMessageTooLarge();
674 }
675
676 if (_opcode>=0 && _connection.getMaxBinaryMessageSize()>=0)
677 {
678 if (checkBinaryMessageSize(_aggregate.length(),buffer.length()))
679 {
680 _aggregate.put(buffer);
681
682
683 if (lastFrame && _onBinaryMessage!=null)
684 {
685 try
686 {
687 _onBinaryMessage.onMessage(_aggregate.array(),_aggregate.getIndex(),_aggregate.length());
688 }
689 finally
690 {
691 _opcode=-1;
692 _aggregate.clear();
693 }
694 }
695 }
696 }
697 break;
698 }
699 case WebSocketConnectionD08.OP_PING:
700 {
701 LOG.debug("PING {}",this);
702 if (!_closedOut)
703 _connection.sendControl(WebSocketConnectionD08.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length());
704 break;
705 }
706
707 case WebSocketConnectionD08.OP_PONG:
708 {
709 LOG.debug("PONG {}",this);
710 break;
711 }
712
713 case WebSocketConnectionD08.OP_CLOSE:
714 {
715 int code=WebSocketConnectionD08.CLOSE_NOCODE;
716 String message=null;
717 if (buffer.length()>=2)
718 {
719 code=buffer.array()[buffer.getIndex()]*0x100+buffer.array()[buffer.getIndex()+1];
720 if (buffer.length()>2)
721 message=new String(buffer.array(),buffer.getIndex()+2,buffer.length()-2,StringUtil.__UTF8);
722 }
723 closeIn(code,message);
724 break;
725 }
726
727 case WebSocketConnectionD08.OP_TEXT:
728 {
729 if(_onTextMessage!=null)
730 {
731 if (_connection.getMaxTextMessageSize()<=0)
732 {
733
734 if (lastFrame)
735 _onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8));
736 else
737 {
738 LOG.warn("Frame discarded. Text aggregation disabled for {}",_endp);
739 _connection.close(WebSocketConnectionD08.CLOSE_BADDATA,"Text frame aggregation disabled");
740 }
741 }
742
743 else if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
744 {
745 if (lastFrame)
746 {
747 String msg =_utf8.toString();
748 _utf8.reset();
749 _onTextMessage.onMessage(msg);
750 }
751 else
752 {
753 _opcode=WebSocketConnectionD08.OP_TEXT;
754 }
755 }
756 else
757 textMessageTooLarge();
758 }
759 break;
760 }
761
762 default:
763 {
764 if (_onBinaryMessage!=null && checkBinaryMessageSize(0,buffer.length()))
765 {
766 if (lastFrame)
767 {
768 _onBinaryMessage.onMessage(array,buffer.getIndex(),buffer.length());
769 }
770 else if (_connection.getMaxBinaryMessageSize()>=0)
771 {
772 _opcode=opcode;
773 if (_aggregate==null)
774 _aggregate=new ByteArrayBuffer(_connection.getMaxBinaryMessageSize());
775 _aggregate.put(buffer);
776 }
777 else
778 {
779 LOG.warn("Frame discarded. Binary aggregation disabed for {}",_endp);
780 _connection.close(WebSocketConnectionD08.CLOSE_BADDATA,"Binary frame aggregation disabled");
781 }
782 }
783 }
784 }
785 }
786 catch(Throwable th)
787 {
788 LOG.warn(th);
789 }
790 }
791
792 private boolean checkBinaryMessageSize(int bufferLen, int length)
793 {
794 int max = _connection.getMaxBinaryMessageSize();
795 if (max>0 && (bufferLen+length)>max)
796 {
797 LOG.warn("Binary message too large > {}B for {}",_connection.getMaxBinaryMessageSize(),_endp);
798 _connection.close(WebSocketConnectionD08.CLOSE_BADDATA,"Message size > "+_connection.getMaxBinaryMessageSize());
799 _opcode=-1;
800 if (_aggregate!=null)
801 _aggregate.clear();
802 return false;
803 }
804 return true;
805 }
806
807 private void textMessageTooLarge()
808 {
809 LOG.warn("Text message too large > {} chars for {}",_connection.getMaxTextMessageSize(),_endp);
810 _connection.close(WebSocketConnectionD08.CLOSE_BADDATA,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
811
812 _opcode=-1;
813 _utf8.reset();
814 }
815
816 public void close(int code,String message)
817 {
818 if (code!=CLOSE_NORMAL)
819 LOG.warn("Close: "+code+" "+message);
820 _connection.close(code,message);
821 }
822
823 @Override
824 public String toString()
825 {
826 return WebSocketConnectionD08.this.toString()+"FH";
827 }
828 }
829
830
831 public static String hashKey(String key)
832 {
833 try
834 {
835 MessageDigest md = MessageDigest.getInstance("SHA1");
836 md.update(key.getBytes("UTF-8"));
837 md.update(MAGIC);
838 return new String(B64Code.encode(md.digest()));
839 }
840 catch (Exception e)
841 {
842 throw new RuntimeException(e);
843 }
844 }
845
846
847 @Override
848 public String toString()
849 {
850 return String.format("WS/D%d p=%s g=%s", _draft, _parser, _generator);
851 }
852 }