1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.websocket;
15
16 import java.io.IOException;
17 import java.io.UnsupportedEncodingException;
18 import java.security.MessageDigest;
19 import java.util.Collections;
20 import java.util.List;
21
22 import javax.servlet.http.HttpServletRequest;
23 import javax.servlet.http.HttpServletResponse;
24
25 import org.eclipse.jetty.io.AbstractConnection;
26 import org.eclipse.jetty.io.AsyncEndPoint;
27 import org.eclipse.jetty.io.Buffer;
28 import org.eclipse.jetty.io.ByteArrayBuffer;
29 import org.eclipse.jetty.io.Connection;
30 import org.eclipse.jetty.io.EndPoint;
31 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
32 import org.eclipse.jetty.util.B64Code;
33 import org.eclipse.jetty.util.StringUtil;
34 import org.eclipse.jetty.util.Utf8StringBuilder;
35 import org.eclipse.jetty.util.log.Log;
36 import org.eclipse.jetty.util.log.Logger;
37 import org.eclipse.jetty.websocket.WebSocket.OnBinaryMessage;
38 import org.eclipse.jetty.websocket.WebSocket.OnControl;
39 import org.eclipse.jetty.websocket.WebSocket.OnFrame;
40 import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
41
42 public class WebSocketConnectionD13 extends AbstractConnection implements WebSocketConnection
43 {
44 private static final Logger LOG = Log.getLogger(WebSocketConnectionD13.class);
45
46 final static byte OP_CONTINUATION = 0x00;
47 final static byte OP_TEXT = 0x01;
48 final static byte OP_BINARY = 0x02;
49 final static byte OP_EXT_DATA = 0x03;
50
51 final static byte OP_CONTROL = 0x08;
52 final static byte OP_CLOSE = 0x08;
53 final static byte OP_PING = 0x09;
54 final static byte OP_PONG = 0x0A;
55 final static byte OP_EXT_CTRL = 0x0B;
56
57 final static int CLOSE_NORMAL=1000;
58 final static int CLOSE_SHUTDOWN=1001;
59 final static int CLOSE_PROTOCOL=1002;
60 final static int CLOSE_BAD_DATA=1003;
61 final static int CLOSE_UNDEFINED=1004;
62 final static int CLOSE_NO_CODE=1005;
63 final static int CLOSE_NO_CLOSE=1006;
64 final static int CLOSE_NOT_UTF8=1007;
65 final static int CLOSE_POLICY_VIOLATION=1008;
66 final static int CLOSE_MESSAGE_TOO_LARGE=1009;
67 final static int CLOSE_REQUIRED_EXTENSION=1010;
68
69 final static int FLAG_FIN=0x8;
70
71 final static int VERSION=13;
72
73 static boolean isLastFrame(byte flags)
74 {
75 return (flags&FLAG_FIN)!=0;
76 }
77
78 static boolean isControlFrame(byte opcode)
79 {
80 return (opcode&OP_CONTROL)!=0;
81 }
82
83 private final static byte[] MAGIC;
84 private final IdleCheck _idle;
85 private final List<Extension> _extensions;
86 private final WebSocketParserD13 _parser;
87 private final WebSocketParser.FrameHandler _inbound;
88 private final WebSocketGeneratorD13 _generator;
89 private final WebSocketGenerator _outbound;
90 private final WebSocket _webSocket;
91 private final OnFrame _onFrame;
92 private final OnBinaryMessage _onBinaryMessage;
93 private final OnTextMessage _onTextMessage;
94 private final OnControl _onControl;
95 private final String _protocol;
96 private final int _draft;
97 private final ClassLoader _context;
98 private volatile int _closeCode;
99 private volatile String _closeMessage;
100 private volatile boolean _closedIn;
101 private volatile boolean _closedOut;
102 private int _maxTextMessageSize=-1;
103 private int _maxBinaryMessageSize=-1;
104
105 static
106 {
107 try
108 {
109 MAGIC="258EAFA5-E914-47DA-95CA-C5AB0DC85B11".getBytes(StringUtil.__ISO_8859_1);
110 }
111 catch (UnsupportedEncodingException e)
112 {
113 throw new RuntimeException(e);
114 }
115 }
116
117 private final WebSocketParser.FrameHandler _frameHandler= new WSFrameHandler();
118
119 private final WebSocket.FrameConnection _connection = new WSFrameConnection();
120
121
122
123 public WebSocketConnectionD13(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol, List<Extension> extensions,int draft)
124 throws IOException
125 {
126 this(websocket,endpoint,buffers,timestamp,maxIdleTime,protocol,extensions,draft,null);
127 }
128
129
130 public WebSocketConnectionD13(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol, List<Extension> extensions,int draft, MaskGen maskgen)
131 throws IOException
132 {
133 super(endpoint,timestamp);
134
135 _context=Thread.currentThread().getContextClassLoader();
136
137 if (endpoint instanceof AsyncEndPoint)
138 ((AsyncEndPoint)endpoint).cancelIdle();
139
140 _draft=draft;
141 _endp.setMaxIdleTime(maxIdleTime);
142
143 _webSocket = websocket;
144 _onFrame=_webSocket instanceof OnFrame ? (OnFrame)_webSocket : null;
145 _onTextMessage=_webSocket instanceof OnTextMessage ? (OnTextMessage)_webSocket : null;
146 _onBinaryMessage=_webSocket instanceof OnBinaryMessage ? (OnBinaryMessage)_webSocket : null;
147 _onControl=_webSocket instanceof OnControl ? (OnControl)_webSocket : null;
148 _generator = new WebSocketGeneratorD13(buffers, _endp,maskgen);
149
150 _extensions=extensions;
151 if (_extensions!=null)
152 {
153 int e=0;
154 for (Extension extension : _extensions)
155 {
156 extension.bind(
157 _connection,
158 e==extensions.size()-1?_frameHandler:extensions.get(e+1),
159 e==0?_generator:extensions.get(e-1));
160 e++;
161 }
162 }
163
164 _outbound=(_extensions==null||_extensions.size()==0)?_generator:extensions.get(extensions.size()-1);
165 _inbound=(_extensions==null||_extensions.size()==0)?_frameHandler:extensions.get(0);
166
167 _parser = new WebSocketParserD13(buffers, endpoint,_inbound,maskgen==null);
168
169 _protocol=protocol;
170
171
172 if (_endp instanceof SelectChannelEndPoint)
173 {
174 final SelectChannelEndPoint scep=(SelectChannelEndPoint)_endp;
175 scep.cancelIdle();
176 _idle=new IdleCheck()
177 {
178 public void access(EndPoint endp)
179 {
180 scep.scheduleIdle();
181 }
182 };
183 scep.scheduleIdle();
184 }
185 else
186 {
187 _idle = new IdleCheck()
188 {
189 public void access(EndPoint endp)
190 {}
191 };
192 }
193 }
194
195
196 public WebSocket.Connection getConnection()
197 {
198 return _connection;
199 }
200
201
202 public List<Extension> getExtensions()
203 {
204 if (_extensions==null)
205 return Collections.emptyList();
206
207 return _extensions;
208 }
209
210
211 public Connection handle() throws IOException
212 {
213 Thread current = Thread.currentThread();
214 ClassLoader oldcontext = current.getContextClassLoader();
215 current.setContextClassLoader(_context);
216 try
217 {
218
219 boolean progress=true;
220
221 while (progress)
222 {
223 int flushed=_generator.flushBuffer();
224 int filled=_parser.parseNext();
225
226 progress = flushed>0 || filled>0;
227
228 if (filled<0 || flushed<0)
229 {
230 _endp.close();
231 break;
232 }
233 }
234 }
235 catch(IOException e)
236 {
237 try
238 {
239 _endp.close();
240 }
241 catch(IOException e2)
242 {
243 LOG.ignore(e2);
244 }
245 throw e;
246 }
247 finally
248 {
249 current.setContextClassLoader(oldcontext);
250 _parser.returnBuffer();
251 _generator.returnBuffer();
252 if (_endp.isOpen())
253 {
254 _idle.access(_endp);
255 if (_closedIn && _closedOut && _outbound.isBufferEmpty())
256 _endp.close();
257 else if (_endp.isInputShutdown() && !_closedIn)
258 closeIn(CLOSE_NO_CLOSE,null);
259 else
260 checkWriteable();
261 }
262 }
263 return this;
264 }
265
266
267 public boolean isIdle()
268 {
269 return _parser.isBufferEmpty() && _outbound.isBufferEmpty();
270 }
271
272
273 @Override
274 public void idleExpired()
275 {
276 long idle = System.currentTimeMillis()-((SelectChannelEndPoint)_endp).getIdleTimestamp();
277 closeOut(WebSocketConnectionD13.CLOSE_NORMAL,"Idle for "+idle+"ms > "+_endp.getMaxIdleTime()+"ms");
278 }
279
280
281 public boolean isSuspended()
282 {
283 return false;
284 }
285
286
287 public void closed()
288 {
289 final boolean closed;
290 synchronized (this)
291 {
292 closed=_closeCode==0;
293 if (closed)
294 _closeCode=WebSocketConnectionD13.CLOSE_NO_CLOSE;
295 }
296 if (closed)
297 _webSocket.onClose(WebSocketConnectionD13.CLOSE_NO_CLOSE,"closed");
298 }
299
300
301 public void closeIn(int code,String message)
302 {
303 LOG.debug("ClosedIn {} {}",this,message);
304
305 final boolean closedOut;
306 final boolean closed;
307 synchronized (this)
308 {
309 closedOut=_closedOut;
310 _closedIn=true;
311 closed=_closeCode==0;
312 if (closed)
313 {
314 _closeCode=code;
315 _closeMessage=message;
316 }
317 }
318
319 try
320 {
321 if (closed)
322 _webSocket.onClose(code,message);
323 }
324 finally
325 {
326 try
327 {
328 if (closedOut)
329 _endp.close();
330 else
331 closeOut(code,message);
332 }
333 catch(IOException e)
334 {
335 LOG.ignore(e);
336 }
337 }
338 }
339
340
341 public void closeOut(int code,String message)
342 {
343 LOG.debug("ClosedOut {} {}",this,message);
344
345 final boolean close;
346 final boolean closed;
347 synchronized (this)
348 {
349 close=_closedIn || _closedOut;
350 _closedOut=true;
351 closed=_closeCode==0;
352 if (closed)
353 {
354 _closeCode=code;
355 _closeMessage=message;
356 }
357 }
358
359 try
360 {
361 if (closed)
362 _webSocket.onClose(code,message);
363 }
364 finally
365 {
366 try
367 {
368 if (close)
369 _endp.close();
370 else
371 {
372 if (code<=0)
373 code=WebSocketConnectionD13.CLOSE_NORMAL;
374 byte[] bytes = ("xx"+(message==null?"":message)).getBytes(StringUtil.__ISO_8859_1);
375 bytes[0]=(byte)(code/0x100);
376 bytes[1]=(byte)(code%0x100);
377 _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD13.OP_CLOSE,bytes,0,bytes.length);
378 }
379 _outbound.flush();
380
381 }
382 catch(IOException e)
383 {
384 LOG.ignore(e);
385 }
386 }
387 }
388
389
390 public void fillBuffersFrom(Buffer buffer)
391 {
392 _parser.fill(buffer);
393 }
394
395
396 private void checkWriteable()
397 {
398 if (!_outbound.isBufferEmpty() && _endp instanceof AsyncEndPoint)
399 {
400 ((AsyncEndPoint)_endp).scheduleWrite();
401 }
402 }
403
404
405
406
407 private class WSFrameConnection implements WebSocket.FrameConnection
408 {
409 volatile boolean _disconnecting;
410
411
412 public void sendMessage(String content) throws IOException
413 {
414 if (_closedOut)
415 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
416 byte[] data = content.getBytes(StringUtil.__UTF8);
417 _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD13.OP_TEXT,data,0,data.length);
418 checkWriteable();
419 _idle.access(_endp);
420 }
421
422
423 public void sendMessage(byte[] content, int offset, int length) throws IOException
424 {
425 if (_closedOut)
426 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
427 _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD13.OP_BINARY,content,offset,length);
428 checkWriteable();
429 _idle.access(_endp);
430 }
431
432
433 public void sendFrame(byte flags,byte opcode, byte[] content, int offset, int length) throws IOException
434 {
435 if (_closedOut)
436 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
437 _outbound.addFrame(flags,opcode,content,offset,length);
438 checkWriteable();
439 _idle.access(_endp);
440 }
441
442
443 public void sendControl(byte ctrl, byte[] data, int offset, int length) throws IOException
444 {
445 if (_closedOut)
446 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
447 _outbound.addFrame((byte)FLAG_FIN,ctrl,data,offset,length);
448 checkWriteable();
449 _idle.access(_endp);
450 }
451
452
453 public boolean isMessageComplete(byte flags)
454 {
455 return isLastFrame(flags);
456 }
457
458
459 public boolean isOpen()
460 {
461 return _endp!=null&&_endp.isOpen();
462 }
463
464
465 public void close(int code, String message)
466 {
467 if (_disconnecting)
468 return;
469 _disconnecting=true;
470 WebSocketConnectionD13.this.closeOut(code,message);
471 }
472
473
474 public void setMaxIdleTime(int ms)
475 {
476 try
477 {
478 _endp.setMaxIdleTime(ms);
479 }
480 catch(IOException e)
481 {
482 LOG.warn(e);
483 }
484 }
485
486
487 public void setMaxTextMessageSize(int size)
488 {
489 _maxTextMessageSize=size;
490 }
491
492
493 public void setMaxBinaryMessageSize(int size)
494 {
495 _maxBinaryMessageSize=size;
496 }
497
498
499 public int getMaxIdleTime()
500 {
501 return _endp.getMaxIdleTime();
502 }
503
504
505 public int getMaxTextMessageSize()
506 {
507 return _maxTextMessageSize;
508 }
509
510
511 public int getMaxBinaryMessageSize()
512 {
513 return _maxBinaryMessageSize;
514 }
515
516
517 public String getProtocol()
518 {
519 return _protocol;
520 }
521
522
523 public byte binaryOpcode()
524 {
525 return OP_BINARY;
526 }
527
528
529 public byte textOpcode()
530 {
531 return OP_TEXT;
532 }
533
534
535 public byte continuationOpcode()
536 {
537 return OP_CONTINUATION;
538 }
539
540
541 public byte finMask()
542 {
543 return FLAG_FIN;
544 }
545
546
547 public boolean isControl(byte opcode)
548 {
549 return isControlFrame(opcode);
550 }
551
552
553 public boolean isText(byte opcode)
554 {
555 return opcode==OP_TEXT;
556 }
557
558
559 public boolean isBinary(byte opcode)
560 {
561 return opcode==OP_BINARY;
562 }
563
564
565 public boolean isContinuation(byte opcode)
566 {
567 return opcode==OP_CONTINUATION;
568 }
569
570
571 public boolean isClose(byte opcode)
572 {
573 return opcode==OP_CLOSE;
574 }
575
576
577 public boolean isPing(byte opcode)
578 {
579 return opcode==OP_PING;
580 }
581
582
583 public boolean isPong(byte opcode)
584 {
585 return opcode==OP_PONG;
586 }
587
588
589 public void disconnect()
590 {
591 close(CLOSE_NORMAL,null);
592 }
593
594
595 public void setAllowFrameFragmentation(boolean allowFragmentation)
596 {
597 _parser.setFakeFragments(allowFragmentation);
598 }
599
600
601 public boolean isAllowFrameFragmentation()
602 {
603 return _parser.isFakeFragments();
604 }
605
606
607 @Override
608 public String toString()
609 {
610 return this.getClass().getSimpleName()+"@"+_endp.getLocalAddr()+":"+_endp.getLocalPort()+"<->"+_endp.getRemoteAddr()+":"+_endp.getRemotePort();
611 }
612 }
613
614
615
616
617 private class WSFrameHandler implements WebSocketParser.FrameHandler
618 {
619 private final Utf8StringBuilder _utf8 = new Utf8StringBuilder(512);
620 private ByteArrayBuffer _aggregate;
621 private byte _opcode=-1;
622
623 public void onFrame(final byte flags, final byte opcode, final Buffer buffer)
624 {
625 boolean lastFrame = isLastFrame(flags);
626
627 synchronized(WebSocketConnectionD13.this)
628 {
629
630 if (_closedIn)
631 return;
632 }
633 try
634 {
635 byte[] array=buffer.array();
636
637
638 if (_onFrame!=null)
639 {
640 if (_onFrame.onFrame(flags,opcode,array,buffer.getIndex(),buffer.length()))
641 return;
642 }
643
644 if (_onControl!=null && isControlFrame(opcode))
645 {
646 if (_onControl.onControl(opcode,array,buffer.getIndex(),buffer.length()))
647 return;
648 }
649
650 switch(opcode)
651 {
652 case WebSocketConnectionD13.OP_CONTINUATION:
653 {
654
655 if (_onTextMessage!=null && _opcode==WebSocketConnectionD13.OP_TEXT)
656 {
657 if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
658 {
659
660 if (lastFrame)
661 {
662 _opcode=-1;
663 String msg =_utf8.toString();
664 _utf8.reset();
665 _onTextMessage.onMessage(msg);
666 }
667 }
668 else
669 textMessageTooLarge();
670 }
671
672 if (_opcode>=0 && _connection.getMaxBinaryMessageSize()>=0)
673 {
674 if (checkBinaryMessageSize(_aggregate.length(),buffer.length()))
675 {
676 _aggregate.put(buffer);
677
678
679 if (lastFrame && _onBinaryMessage!=null)
680 {
681 try
682 {
683 _onBinaryMessage.onMessage(_aggregate.array(),_aggregate.getIndex(),_aggregate.length());
684 }
685 finally
686 {
687 _opcode=-1;
688 _aggregate.clear();
689 }
690 }
691 }
692 }
693 break;
694 }
695 case WebSocketConnectionD13.OP_PING:
696 {
697 LOG.debug("PING {}",this);
698 if (!_closedOut)
699 _connection.sendControl(WebSocketConnectionD13.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length());
700 break;
701 }
702
703 case WebSocketConnectionD13.OP_PONG:
704 {
705 LOG.debug("PONG {}",this);
706 break;
707 }
708
709 case WebSocketConnectionD13.OP_CLOSE:
710 {
711 int code=WebSocketConnectionD13.CLOSE_NO_CODE;
712 String message=null;
713 if (buffer.length()>=2)
714 {
715 code=buffer.array()[buffer.getIndex()]*0x100+buffer.array()[buffer.getIndex()+1];
716 if (buffer.length()>2)
717 message=new String(buffer.array(),buffer.getIndex()+2,buffer.length()-2,StringUtil.__UTF8);
718 }
719 closeIn(code,message);
720 break;
721 }
722
723 case WebSocketConnectionD13.OP_TEXT:
724 {
725 if(_onTextMessage!=null)
726 {
727 if (_connection.getMaxTextMessageSize()<=0)
728 {
729
730 if (lastFrame)
731 _onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8));
732 else
733 {
734 LOG.warn("Frame discarded. Text aggregation disabled for {}",_endp);
735 _connection.close(WebSocketConnectionD13.CLOSE_POLICY_VIOLATION,"Text frame aggregation disabled");
736 }
737 }
738
739 else if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
740 {
741 if (lastFrame)
742 {
743 String msg =_utf8.toString();
744 _utf8.reset();
745 _onTextMessage.onMessage(msg);
746 }
747 else
748 {
749 _opcode=WebSocketConnectionD13.OP_TEXT;
750 }
751 }
752 else
753 textMessageTooLarge();
754 }
755 break;
756 }
757
758 default:
759 {
760 if (_onBinaryMessage!=null && checkBinaryMessageSize(0,buffer.length()))
761 {
762 if (lastFrame)
763 {
764 _onBinaryMessage.onMessage(array,buffer.getIndex(),buffer.length());
765 }
766 else if (_connection.getMaxBinaryMessageSize()>=0)
767 {
768 _opcode=opcode;
769
770 if (_aggregate==null)
771 _aggregate=new ByteArrayBuffer(_connection.getMaxBinaryMessageSize());
772 _aggregate.put(buffer);
773 }
774 else
775 {
776 LOG.warn("Frame discarded. Binary aggregation disabed for {}",_endp);
777 _connection.close(WebSocketConnectionD13.CLOSE_POLICY_VIOLATION,"Binary frame aggregation disabled");
778 }
779 }
780 }
781 }
782 }
783 catch(ThreadDeath th)
784 {
785 throw th;
786 }
787 catch(Throwable th)
788 {
789 LOG.warn(th);
790 }
791 }
792
793 private boolean checkBinaryMessageSize(int bufferLen, int length)
794 {
795 int max = _connection.getMaxBinaryMessageSize();
796 if (max>0 && (bufferLen+length)>max)
797 {
798 LOG.warn("Binary message too large > {}B for {}",_connection.getMaxBinaryMessageSize(),_endp);
799 _connection.close(WebSocketConnectionD13.CLOSE_MESSAGE_TOO_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
800 _opcode=-1;
801 if (_aggregate!=null)
802 _aggregate.clear();
803 return false;
804 }
805 return true;
806 }
807
808 private void textMessageTooLarge()
809 {
810 LOG.warn("Text message too large > {} chars for {}",_connection.getMaxTextMessageSize(),_endp);
811 _connection.close(WebSocketConnectionD13.CLOSE_MESSAGE_TOO_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
812
813 _opcode=-1;
814 _utf8.reset();
815 }
816
817 public void close(int code,String message)
818 {
819 if (code!=CLOSE_NORMAL)
820 LOG.warn("Close: "+code+" "+message);
821 _connection.close(code,message);
822 }
823
824 @Override
825 public String toString()
826 {
827 return WebSocketConnectionD13.this.toString()+"FH";
828 }
829 }
830
831
832 private interface IdleCheck
833 {
834 void access(EndPoint endp);
835 }
836
837
838 public void handshake(HttpServletRequest request, HttpServletResponse response, String subprotocol) throws IOException
839 {
840 String uri=request.getRequestURI();
841 String query=request.getQueryString();
842 if (query!=null && query.length()>0)
843 uri+="?"+query;
844 String key = request.getHeader("Sec-WebSocket-Key");
845
846 response.setHeader("Upgrade","WebSocket");
847 response.addHeader("Connection","Upgrade");
848 response.addHeader("Sec-WebSocket-Accept",hashKey(key));
849 if (subprotocol!=null)
850 response.addHeader("Sec-WebSocket-Protocol",subprotocol);
851
852 for(Extension ext : _extensions)
853 response.addHeader("Sec-WebSocket-Extensions",ext.getParameterizedName());
854
855 response.sendError(101);
856
857 if (_onFrame!=null)
858 _onFrame.onHandshake(_connection);
859 _webSocket.onOpen(_connection);
860 }
861
862
863 public static String hashKey(String key)
864 {
865 try
866 {
867 MessageDigest md = MessageDigest.getInstance("SHA1");
868 md.update(key.getBytes("UTF-8"));
869 md.update(MAGIC);
870 return new String(B64Code.encode(md.digest()));
871 }
872 catch (Exception e)
873 {
874 throw new RuntimeException(e);
875 }
876 }
877
878
879 @Override
880 public String toString()
881 {
882 return "WS/D"+_draft+"-"+_endp;
883 }
884 }