1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.websocket;
15
16 import java.io.IOException;
17 import java.io.UnsupportedEncodingException;
18 import java.security.MessageDigest;
19 import java.util.Collections;
20 import java.util.List;
21
22 import javax.servlet.http.HttpServletRequest;
23 import javax.servlet.http.HttpServletResponse;
24
25 import org.eclipse.jetty.io.AbstractConnection;
26 import org.eclipse.jetty.io.AsyncEndPoint;
27 import org.eclipse.jetty.io.Buffer;
28 import org.eclipse.jetty.io.ByteArrayBuffer;
29 import org.eclipse.jetty.io.Connection;
30 import org.eclipse.jetty.io.EndPoint;
31 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
32 import org.eclipse.jetty.util.B64Code;
33 import org.eclipse.jetty.util.StringUtil;
34 import org.eclipse.jetty.util.Utf8StringBuilder;
35 import org.eclipse.jetty.util.log.Log;
36 import org.eclipse.jetty.util.log.Logger;
37 import org.eclipse.jetty.websocket.WebSocket.OnBinaryMessage;
38 import org.eclipse.jetty.websocket.WebSocket.OnControl;
39 import org.eclipse.jetty.websocket.WebSocket.OnFrame;
40 import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
41
42 public class WebSocketConnectionD12 extends AbstractConnection implements WebSocketConnection
43 {
44 private static final Logger LOG = Log.getLogger(WebSocketConnectionD12.class);
45
46 final static byte OP_CONTINUATION = 0x00;
47 final static byte OP_TEXT = 0x01;
48 final static byte OP_BINARY = 0x02;
49 final static byte OP_EXT_DATA = 0x03;
50
51 final static byte OP_CONTROL = 0x08;
52 final static byte OP_CLOSE = 0x08;
53 final static byte OP_PING = 0x09;
54 final static byte OP_PONG = 0x0A;
55 final static byte OP_EXT_CTRL = 0x0B;
56
57 final static int CLOSE_NORMAL=1000;
58 final static int CLOSE_SHUTDOWN=1001;
59 final static int CLOSE_PROTOCOL=1002;
60 final static int CLOSE_BADDATA=1003;
61 final static int CLOSE_NOCODE=1005;
62 final static int CLOSE_NOCLOSE=1006;
63 final static int CLOSE_NOTUTF8=1007;
64
65 final static int FLAG_FIN=0x8;
66
67 final static int VERSION=8;
68
69 static boolean isLastFrame(byte flags)
70 {
71 return (flags&FLAG_FIN)!=0;
72 }
73
74 static boolean isControlFrame(byte opcode)
75 {
76 return (opcode&OP_CONTROL)!=0;
77 }
78
79 private final static byte[] MAGIC;
80 private final IdleCheck _idle;
81 private final List<Extension> _extensions;
82 private final WebSocketParserD12 _parser;
83 private final WebSocketParser.FrameHandler _inbound;
84 private final WebSocketGeneratorD12 _generator;
85 private final WebSocketGenerator _outbound;
86 private final WebSocket _webSocket;
87 private final OnFrame _onFrame;
88 private final OnBinaryMessage _onBinaryMessage;
89 private final OnTextMessage _onTextMessage;
90 private final OnControl _onControl;
91 private final String _protocol;
92 private final int _draft;
93 private final ClassLoader _context;
94 private volatile int _closeCode;
95 private volatile String _closeMessage;
96 private volatile boolean _closedIn;
97 private volatile boolean _closedOut;
98 private int _maxTextMessageSize=-1;
99 private int _maxBinaryMessageSize=-1;
100
101 static
102 {
103 try
104 {
105 MAGIC="258EAFA5-E914-47DA-95CA-C5AB0DC85B11".getBytes(StringUtil.__ISO_8859_1);
106 }
107 catch (UnsupportedEncodingException e)
108 {
109 throw new RuntimeException(e);
110 }
111 }
112
113 private final WebSocketParser.FrameHandler _frameHandler= new WSFrameHandler();
114
115 private final WebSocket.FrameConnection _connection = new WSFrameConnection();
116
117
118
119 public WebSocketConnectionD12(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol, List<Extension> extensions,int draft)
120 throws IOException
121 {
122 this(websocket,endpoint,buffers,timestamp,maxIdleTime,protocol,extensions,draft,null);
123 }
124
125
126 public WebSocketConnectionD12(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol, List<Extension> extensions,int draft, MaskGen maskgen)
127 throws IOException
128 {
129 super(endpoint,timestamp);
130
131 _context=Thread.currentThread().getContextClassLoader();
132
133 if (endpoint instanceof AsyncEndPoint)
134 ((AsyncEndPoint)endpoint).cancelIdle();
135
136 _draft=draft;
137 _endp.setMaxIdleTime(maxIdleTime);
138
139 _webSocket = websocket;
140 _onFrame=_webSocket instanceof OnFrame ? (OnFrame)_webSocket : null;
141 _onTextMessage=_webSocket instanceof OnTextMessage ? (OnTextMessage)_webSocket : null;
142 _onBinaryMessage=_webSocket instanceof OnBinaryMessage ? (OnBinaryMessage)_webSocket : null;
143 _onControl=_webSocket instanceof OnControl ? (OnControl)_webSocket : null;
144 _generator = new WebSocketGeneratorD12(buffers, _endp,maskgen);
145
146 _extensions=extensions;
147 if (_extensions!=null)
148 {
149 int e=0;
150 for (Extension extension : _extensions)
151 {
152 extension.bind(
153 _connection,
154 e==extensions.size()-1?_frameHandler:extensions.get(e+1),
155 e==0?_generator:extensions.get(e-1));
156 e++;
157 }
158 }
159
160 _outbound=(_extensions==null||_extensions.size()==0)?_generator:extensions.get(extensions.size()-1);
161 _inbound=(_extensions==null||_extensions.size()==0)?_frameHandler:extensions.get(0);
162
163 _parser = new WebSocketParserD12(buffers, endpoint,_inbound,maskgen==null);
164
165 _protocol=protocol;
166
167
168 if (_endp instanceof SelectChannelEndPoint)
169 {
170 final SelectChannelEndPoint scep=(SelectChannelEndPoint)_endp;
171 scep.cancelIdle();
172 _idle=new IdleCheck()
173 {
174 public void access(EndPoint endp)
175 {
176 scep.scheduleIdle();
177 }
178 };
179 scep.scheduleIdle();
180 }
181 else
182 {
183 _idle = new IdleCheck()
184 {
185 public void access(EndPoint endp)
186 {}
187 };
188 }
189 }
190
191
192 public WebSocket.Connection getConnection()
193 {
194 return _connection;
195 }
196
197
198 public List<Extension> getExtensions()
199 {
200 if (_extensions==null)
201 return Collections.emptyList();
202
203 return _extensions;
204 }
205
206
207 public Connection handle() throws IOException
208 {
209 Thread current = Thread.currentThread();
210 ClassLoader oldcontext = current.getContextClassLoader();
211 current.setContextClassLoader(_context);
212 try
213 {
214
215 boolean progress=true;
216
217 while (progress)
218 {
219 int flushed=_generator.flushBuffer();
220 int filled=_parser.parseNext();
221
222 progress = flushed>0 || filled>0;
223
224 if (filled<0 || flushed<0)
225 {
226 _endp.close();
227 break;
228 }
229 }
230 }
231 catch(IOException e)
232 {
233 try
234 {
235 _endp.close();
236 }
237 catch(IOException e2)
238 {
239 LOG.ignore(e2);
240 }
241 throw e;
242 }
243 finally
244 {
245 current.setContextClassLoader(oldcontext);
246 _parser.returnBuffer();
247 _generator.returnBuffer();
248 if (_endp.isOpen())
249 {
250 _idle.access(_endp);
251 if (_closedIn && _closedOut && _outbound.isBufferEmpty())
252 _endp.close();
253 else if (_endp.isInputShutdown() && !_closedIn)
254 closeIn(CLOSE_NOCLOSE,null);
255 else
256 checkWriteable();
257 }
258 }
259 return this;
260 }
261
262
263 public boolean isIdle()
264 {
265 return _parser.isBufferEmpty() && _outbound.isBufferEmpty();
266 }
267
268
269 @Override
270 public void idleExpired()
271 {
272 long idle = System.currentTimeMillis()-((SelectChannelEndPoint)_endp).getIdleTimestamp();
273 closeOut(WebSocketConnectionD12.CLOSE_NORMAL,"Idle for "+idle+"ms > "+_endp.getMaxIdleTime()+"ms");
274 }
275
276
277 public boolean isSuspended()
278 {
279 return false;
280 }
281
282
283 public void closed()
284 {
285 final boolean closed;
286 synchronized (this)
287 {
288 closed=_closeCode==0;
289 if (closed)
290 _closeCode=WebSocketConnectionD12.CLOSE_NOCLOSE;
291 }
292 if (closed)
293 _webSocket.onClose(WebSocketConnectionD12.CLOSE_NOCLOSE,"closed");
294 }
295
296
297 public void closeIn(int code,String message)
298 {
299 LOG.debug("ClosedIn {} {}",this,message);
300
301 final boolean closedOut;
302 final boolean closed;
303 synchronized (this)
304 {
305 closedOut=_closedOut;
306 _closedIn=true;
307 closed=_closeCode==0;
308 if (closed)
309 {
310 _closeCode=code;
311 _closeMessage=message;
312 }
313 }
314
315 try
316 {
317 if (closed)
318 _webSocket.onClose(code,message);
319 }
320 finally
321 {
322 try
323 {
324 if (closedOut)
325 _endp.close();
326 else
327 closeOut(code,message);
328 }
329 catch(IOException e)
330 {
331 LOG.ignore(e);
332 }
333 }
334 }
335
336
337 public void closeOut(int code,String message)
338 {
339 LOG.debug("ClosedOut {} {}",this,message);
340
341 final boolean close;
342 final boolean closed;
343 synchronized (this)
344 {
345 close=_closedIn || _closedOut;
346 _closedOut=true;
347 closed=_closeCode==0;
348 if (closed)
349 {
350 _closeCode=code;
351 _closeMessage=message;
352 }
353 }
354
355 try
356 {
357 if (closed)
358 _webSocket.onClose(code,message);
359 }
360 finally
361 {
362 try
363 {
364 if (close)
365 _endp.close();
366 else
367 {
368 if (code<=0)
369 code=WebSocketConnectionD12.CLOSE_NORMAL;
370 byte[] bytes = ("xx"+(message==null?"":message)).getBytes(StringUtil.__ISO_8859_1);
371 bytes[0]=(byte)(code/0x100);
372 bytes[1]=(byte)(code%0x100);
373 _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD12.OP_CLOSE,bytes,0,bytes.length);
374 }
375 _outbound.flush();
376
377 }
378 catch(IOException e)
379 {
380 LOG.ignore(e);
381 }
382 }
383 }
384
385
386 public void fillBuffersFrom(Buffer buffer)
387 {
388 _parser.fill(buffer);
389 }
390
391
392 private void checkWriteable()
393 {
394 if (!_outbound.isBufferEmpty() && _endp instanceof AsyncEndPoint)
395 {
396 ((AsyncEndPoint)_endp).scheduleWrite();
397 }
398 }
399
400
401
402
403 private class WSFrameConnection implements WebSocket.FrameConnection
404 {
405 volatile boolean _disconnecting;
406
407
408 public void sendMessage(String content) throws IOException
409 {
410 if (_closedOut)
411 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
412 byte[] data = content.getBytes(StringUtil.__UTF8);
413 _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD12.OP_TEXT,data,0,data.length);
414 checkWriteable();
415 _idle.access(_endp);
416 }
417
418
419 public void sendMessage(byte[] content, int offset, int length) throws IOException
420 {
421 if (_closedOut)
422 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
423 _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD12.OP_BINARY,content,offset,length);
424 checkWriteable();
425 _idle.access(_endp);
426 }
427
428
429 public void sendFrame(byte flags,byte opcode, byte[] content, int offset, int length) throws IOException
430 {
431 if (_closedOut)
432 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
433 _outbound.addFrame(flags,opcode,content,offset,length);
434 checkWriteable();
435 _idle.access(_endp);
436 }
437
438
439 public void sendControl(byte ctrl, byte[] data, int offset, int length) throws IOException
440 {
441 if (_closedOut)
442 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
443 _outbound.addFrame((byte)FLAG_FIN,ctrl,data,offset,length);
444 checkWriteable();
445 _idle.access(_endp);
446 }
447
448
449 public boolean isMessageComplete(byte flags)
450 {
451 return isLastFrame(flags);
452 }
453
454
455 public boolean isOpen()
456 {
457 return _endp!=null&&_endp.isOpen();
458 }
459
460
461 public void close(int code, String message)
462 {
463 if (_disconnecting)
464 return;
465 _disconnecting=true;
466 WebSocketConnectionD12.this.closeOut(code,message);
467 }
468
469
470 public void setMaxIdleTime(int ms)
471 {
472 try
473 {
474 _endp.setMaxIdleTime(ms);
475 }
476 catch(IOException e)
477 {
478 LOG.warn(e);
479 }
480 }
481
482
483 public void setMaxTextMessageSize(int size)
484 {
485 _maxTextMessageSize=size;
486 }
487
488
489 public void setMaxBinaryMessageSize(int size)
490 {
491 _maxBinaryMessageSize=size;
492 }
493
494
495 public int getMaxIdleTime()
496 {
497 return _endp.getMaxIdleTime();
498 }
499
500
501 public int getMaxTextMessageSize()
502 {
503 return _maxTextMessageSize;
504 }
505
506
507 public int getMaxBinaryMessageSize()
508 {
509 return _maxBinaryMessageSize;
510 }
511
512
513 public String getProtocol()
514 {
515 return _protocol;
516 }
517
518
519 public byte binaryOpcode()
520 {
521 return OP_BINARY;
522 }
523
524
525 public byte textOpcode()
526 {
527 return OP_TEXT;
528 }
529
530
531 public byte continuationOpcode()
532 {
533 return OP_CONTINUATION;
534 }
535
536
537 public byte finMask()
538 {
539 return FLAG_FIN;
540 }
541
542
543 public boolean isControl(byte opcode)
544 {
545 return isControlFrame(opcode);
546 }
547
548
549 public boolean isText(byte opcode)
550 {
551 return opcode==OP_TEXT;
552 }
553
554
555 public boolean isBinary(byte opcode)
556 {
557 return opcode==OP_BINARY;
558 }
559
560
561 public boolean isContinuation(byte opcode)
562 {
563 return opcode==OP_CONTINUATION;
564 }
565
566
567 public boolean isClose(byte opcode)
568 {
569 return opcode==OP_CLOSE;
570 }
571
572
573 public boolean isPing(byte opcode)
574 {
575 return opcode==OP_PING;
576 }
577
578
579 public boolean isPong(byte opcode)
580 {
581 return opcode==OP_PONG;
582 }
583
584
585 public void disconnect()
586 {
587 close(CLOSE_NORMAL,null);
588 }
589
590
591 public void setAllowFrameFragmentation(boolean allowFragmentation)
592 {
593 _parser.setFakeFragments(allowFragmentation);
594 }
595
596
597 public boolean isAllowFrameFragmentation()
598 {
599 return _parser.isFakeFragments();
600 }
601
602
603 @Override
604 public String toString()
605 {
606 return this.getClass().getSimpleName()+"@"+_endp.getLocalAddr()+":"+_endp.getLocalPort()+"<->"+_endp.getRemoteAddr()+":"+_endp.getRemotePort();
607 }
608 }
609
610
611
612
613 private class WSFrameHandler implements WebSocketParser.FrameHandler
614 {
615 private final Utf8StringBuilder _utf8 = new Utf8StringBuilder();
616 private ByteArrayBuffer _aggregate;
617 private byte _opcode=-1;
618
619 public void onFrame(final byte flags, final byte opcode, final Buffer buffer)
620 {
621 boolean lastFrame = isLastFrame(flags);
622
623 synchronized(WebSocketConnectionD12.this)
624 {
625
626 if (_closedIn)
627 return;
628 }
629 try
630 {
631 byte[] array=buffer.array();
632
633
634 if (_onFrame!=null)
635 {
636 if (_onFrame.onFrame(flags,opcode,array,buffer.getIndex(),buffer.length()))
637 return;
638 }
639
640 if (_onControl!=null && isControlFrame(opcode))
641 {
642 if (_onControl.onControl(opcode,array,buffer.getIndex(),buffer.length()))
643 return;
644 }
645
646 switch(opcode)
647 {
648 case WebSocketConnectionD12.OP_CONTINUATION:
649 {
650
651 if (_onTextMessage!=null && _opcode==WebSocketConnectionD12.OP_TEXT)
652 {
653 if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
654 {
655
656 if (lastFrame)
657 {
658 _opcode=-1;
659 String msg =_utf8.toString();
660 _utf8.reset();
661 _onTextMessage.onMessage(msg);
662 }
663 }
664 else
665 textMessageTooLarge();
666 }
667
668 if (_opcode>=0 && _connection.getMaxBinaryMessageSize()>=0)
669 {
670 if (checkBinaryMessageSize(_aggregate.length(),buffer.length()))
671 {
672 _aggregate.put(buffer);
673
674
675 if (lastFrame && _onBinaryMessage!=null)
676 {
677 try
678 {
679 _onBinaryMessage.onMessage(_aggregate.array(),_aggregate.getIndex(),_aggregate.length());
680 }
681 finally
682 {
683 _opcode=-1;
684 _aggregate.clear();
685 }
686 }
687 }
688 }
689 break;
690 }
691 case WebSocketConnectionD12.OP_PING:
692 {
693 LOG.debug("PING {}",this);
694 if (!_closedOut)
695 _connection.sendControl(WebSocketConnectionD12.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length());
696 break;
697 }
698
699 case WebSocketConnectionD12.OP_PONG:
700 {
701 LOG.debug("PONG {}",this);
702 break;
703 }
704
705 case WebSocketConnectionD12.OP_CLOSE:
706 {
707 int code=WebSocketConnectionD12.CLOSE_NOCODE;
708 String message=null;
709 if (buffer.length()>=2)
710 {
711 code=buffer.array()[buffer.getIndex()]*0x100+buffer.array()[buffer.getIndex()+1];
712 if (buffer.length()>2)
713 message=new String(buffer.array(),buffer.getIndex()+2,buffer.length()-2,StringUtil.__UTF8);
714 }
715 closeIn(code,message);
716 break;
717 }
718
719 case WebSocketConnectionD12.OP_TEXT:
720 {
721 if(_onTextMessage!=null)
722 {
723 if (_connection.getMaxTextMessageSize()<=0)
724 {
725
726 if (lastFrame)
727 _onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8));
728 else
729 {
730 LOG.warn("Frame discarded. Text aggregation disabled for {}",_endp);
731 _connection.close(WebSocketConnectionD12.CLOSE_BADDATA,"Text frame aggregation disabled");
732 }
733 }
734
735 else if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
736 {
737 if (lastFrame)
738 {
739 String msg =_utf8.toString();
740 _utf8.reset();
741 _onTextMessage.onMessage(msg);
742 }
743 else
744 {
745 _opcode=WebSocketConnectionD12.OP_TEXT;
746 }
747 }
748 else
749 textMessageTooLarge();
750 }
751 break;
752 }
753
754 default:
755 {
756 if (_onBinaryMessage!=null && checkBinaryMessageSize(0,buffer.length()))
757 {
758 if (lastFrame)
759 {
760 _onBinaryMessage.onMessage(array,buffer.getIndex(),buffer.length());
761 }
762 else if (_connection.getMaxBinaryMessageSize()>=0)
763 {
764 _opcode=opcode;
765
766 if (_aggregate==null)
767 _aggregate=new ByteArrayBuffer(_connection.getMaxBinaryMessageSize());
768 _aggregate.put(buffer);
769 }
770 else
771 {
772 LOG.warn("Frame discarded. Binary aggregation disabed for {}",_endp);
773 _connection.close(WebSocketConnectionD12.CLOSE_BADDATA,"Binary frame aggregation disabled");
774 }
775 }
776 }
777 }
778 }
779 catch(ThreadDeath th)
780 {
781 throw th;
782 }
783 catch(Throwable th)
784 {
785 LOG.warn(th);
786 }
787 }
788
789 private boolean checkBinaryMessageSize(int bufferLen, int length)
790 {
791 int max = _connection.getMaxBinaryMessageSize();
792 if (max>0 && (bufferLen+length)>max)
793 {
794 LOG.warn("Binary message too large > {}B for {}",_connection.getMaxBinaryMessageSize(),_endp);
795 _connection.close(WebSocketConnectionD12.CLOSE_BADDATA,"Message size > "+_connection.getMaxBinaryMessageSize());
796 _opcode=-1;
797 if (_aggregate!=null)
798 _aggregate.clear();
799 return false;
800 }
801 return true;
802 }
803
804 private void textMessageTooLarge()
805 {
806 LOG.warn("Text message too large > {} chars for {}",_connection.getMaxTextMessageSize(),_endp);
807 _connection.close(WebSocketConnectionD12.CLOSE_BADDATA,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
808
809 _opcode=-1;
810 _utf8.reset();
811 }
812
813 public void close(int code,String message)
814 {
815 if (code!=CLOSE_NORMAL)
816 LOG.warn("Close: "+code+" "+message);
817 _connection.close(code,message);
818 }
819
820 @Override
821 public String toString()
822 {
823 return WebSocketConnectionD12.this.toString()+"FH";
824 }
825 }
826
827
828 private interface IdleCheck
829 {
830 void access(EndPoint endp);
831 }
832
833
834 public void handshake(HttpServletRequest request, HttpServletResponse response, String subprotocol) throws IOException
835 {
836 String uri=request.getRequestURI();
837 String query=request.getQueryString();
838 if (query!=null && query.length()>0)
839 uri+="?"+query;
840 String key = request.getHeader("Sec-WebSocket-Key");
841
842 response.setHeader("Upgrade","WebSocket");
843 response.addHeader("Connection","Upgrade");
844 response.addHeader("Sec-WebSocket-Accept",hashKey(key));
845 if (subprotocol!=null)
846 response.addHeader("Sec-WebSocket-Protocol",subprotocol);
847
848 for(Extension ext : _extensions)
849 response.addHeader("Sec-WebSocket-Extensions",ext.getParameterizedName());
850
851 response.sendError(101);
852
853 if (_onFrame!=null)
854 _onFrame.onHandshake(_connection);
855 _webSocket.onOpen(_connection);
856 }
857
858
859 public static String hashKey(String key)
860 {
861 try
862 {
863 MessageDigest md = MessageDigest.getInstance("SHA1");
864 md.update(key.getBytes("UTF-8"));
865 md.update(MAGIC);
866 return new String(B64Code.encode(md.digest()));
867 }
868 catch (Exception e)
869 {
870 throw new RuntimeException(e);
871 }
872 }
873
874
875 @Override
876 public String toString()
877 {
878 return "WS/D"+_draft+"-"+_endp;
879 }
880 }