1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.websocket;
15
16 import java.io.IOException;
17 import java.io.UnsupportedEncodingException;
18 import java.security.MessageDigest;
19 import java.util.Collections;
20 import java.util.List;
21
22 import javax.servlet.http.HttpServletRequest;
23 import javax.servlet.http.HttpServletResponse;
24
25 import org.eclipse.jetty.io.AbstractConnection;
26 import org.eclipse.jetty.io.AsyncEndPoint;
27 import org.eclipse.jetty.io.Buffer;
28 import org.eclipse.jetty.io.ByteArrayBuffer;
29 import org.eclipse.jetty.io.Connection;
30 import org.eclipse.jetty.io.EndPoint;
31 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
32 import org.eclipse.jetty.util.B64Code;
33 import org.eclipse.jetty.util.StringUtil;
34 import org.eclipse.jetty.util.Utf8StringBuilder;
35 import org.eclipse.jetty.util.log.Log;
36 import org.eclipse.jetty.websocket.WebSocket.OnBinaryMessage;
37 import org.eclipse.jetty.websocket.WebSocket.OnControl;
38 import org.eclipse.jetty.websocket.WebSocket.OnFrame;
39 import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
40 import org.eclipse.jetty.websocket.WebSocketGeneratorD10.MaskGen;
41
42 public class WebSocketConnectionD10 extends AbstractConnection implements WebSocketConnection
43 {
44 final static byte OP_CONTINUATION = 0x00;
45 final static byte OP_TEXT = 0x01;
46 final static byte OP_BINARY = 0x02;
47 final static byte OP_EXT_DATA = 0x03;
48
49 final static byte OP_CONTROL = 0x08;
50 final static byte OP_CLOSE = 0x08;
51 final static byte OP_PING = 0x09;
52 final static byte OP_PONG = 0x0A;
53 final static byte OP_EXT_CTRL = 0x0B;
54
55 final static int CLOSE_NORMAL=1000;
56 final static int CLOSE_SHUTDOWN=1001;
57 final static int CLOSE_PROTOCOL=1002;
58 final static int CLOSE_BADDATA=1003;
59 final static int CLOSE_LARGE=1004;
60 final static int CLOSE_NOCODE=1005;
61 final static int CLOSE_NOCLOSE=1006;
62 final static int CLOSE_NOTUTF8=1007;
63
64 final static int FLAG_FIN=0x8;
65
66 final static int VERSION=8;
67
68 static boolean isLastFrame(byte flags)
69 {
70 return (flags&FLAG_FIN)!=0;
71 }
72
73 static boolean isControlFrame(byte opcode)
74 {
75 return (opcode&OP_CONTROL)!=0;
76 }
77
78 private final static byte[] MAGIC;
79 private final IdleCheck _idle;
80 private final List<Extension> _extensions;
81 private final WebSocketParserD10 _parser;
82 private final WebSocketParser.FrameHandler _inbound;
83 private final WebSocketGeneratorD10 _generator;
84 private final WebSocketGenerator _outbound;
85 private final WebSocket _webSocket;
86 private final OnFrame _onFrame;
87 private final OnBinaryMessage _onBinaryMessage;
88 private final OnTextMessage _onTextMessage;
89 private final OnControl _onControl;
90 private final String _protocol;
91 private final int _draft;
92 private final ClassLoader _context;
93 private volatile int _closeCode;
94 private volatile String _closeMessage;
95 private volatile boolean _closedIn;
96 private volatile boolean _closedOut;
97 private int _maxTextMessageSize;
98 private int _maxBinaryMessageSize=-1;
99
100
101 static
102 {
103 try
104 {
105 MAGIC="258EAFA5-E914-47DA-95CA-C5AB0DC85B11".getBytes(StringUtil.__ISO_8859_1);
106 }
107 catch (UnsupportedEncodingException e)
108 {
109 throw new RuntimeException(e);
110 }
111 }
112
113 private final WebSocketParser.FrameHandler _frameHandler= new WSFrameHandler();
114
115
116
117
118 private final WebSocket.FrameConnection _connection = new WSFrameConnection();
119
120
121
122 public WebSocketConnectionD10(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol, List<Extension> extensions,int draft)
123 throws IOException
124 {
125 this(websocket,endpoint,buffers,timestamp,maxIdleTime,protocol,extensions,draft,null);
126 }
127
128
129 public WebSocketConnectionD10(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol, List<Extension> extensions,int draft, MaskGen maskgen)
130 throws IOException
131 {
132 super(endpoint,timestamp);
133
134 _context=Thread.currentThread().getContextClassLoader();
135
136
137 if (endpoint instanceof AsyncEndPoint)
138 ((AsyncEndPoint)endpoint).cancelIdle();
139
140 _draft=draft;
141 _endp.setMaxIdleTime(maxIdleTime);
142
143 _webSocket = websocket;
144 _onFrame=_webSocket instanceof OnFrame ? (OnFrame)_webSocket : null;
145 _onTextMessage=_webSocket instanceof OnTextMessage ? (OnTextMessage)_webSocket : null;
146 _onBinaryMessage=_webSocket instanceof OnBinaryMessage ? (OnBinaryMessage)_webSocket : null;
147 _onControl=_webSocket instanceof OnControl ? (OnControl)_webSocket : null;
148 _generator = new WebSocketGeneratorD10(buffers, _endp,maskgen);
149
150 _extensions=extensions;
151 if (_extensions!=null)
152 {
153 int e=0;
154 for (Extension extension : _extensions)
155 {
156 extension.bind(
157 _connection,
158 e==extensions.size()-1?_frameHandler:extensions.get(e+1),
159 e==0?_generator:extensions.get(e-1));
160 e++;
161 }
162 }
163
164 _outbound=(_extensions==null||_extensions.size()==0)?_generator:extensions.get(extensions.size()-1);
165 _inbound=(_extensions==null||_extensions.size()==0)?_frameHandler:extensions.get(0);
166
167 _parser = new WebSocketParserD10(buffers, endpoint,_inbound,maskgen==null);
168
169 _protocol=protocol;
170
171
172 if (_endp instanceof SelectChannelEndPoint)
173 {
174 final SelectChannelEndPoint scep=(SelectChannelEndPoint)_endp;
175 scep.cancelIdle();
176 _idle=new IdleCheck()
177 {
178 public void access(EndPoint endp)
179 {
180 scep.scheduleIdle();
181 }
182 };
183 scep.scheduleIdle();
184 }
185 else
186 {
187 _idle = new IdleCheck()
188 {
189 public void access(EndPoint endp)
190 {}
191 };
192 }
193
194 _maxTextMessageSize=buffers.getBufferSize();
195 _maxBinaryMessageSize=-1;
196 }
197
198
199 public WebSocket.Connection getConnection()
200 {
201 return _connection;
202 }
203
204
205 public List<Extension> getExtensions()
206 {
207 if (_extensions==null)
208 return Collections.emptyList();
209
210 return _extensions;
211 }
212
213
214 public Connection handle() throws IOException
215 {
216 Thread current = Thread.currentThread();
217 ClassLoader oldcontext = current.getContextClassLoader();
218 current.setContextClassLoader(_context);
219 try
220 {
221
222 boolean progress=true;
223
224 while (progress)
225 {
226 int flushed=_generator.flushBuffer();
227 int filled=_parser.parseNext();
228
229 progress = flushed>0 || filled>0;
230
231 if (filled<0 || flushed<0)
232 {
233 _endp.close();
234 break;
235 }
236 }
237 }
238 catch(IOException e)
239 {
240 try
241 {
242 _endp.close();
243 }
244 catch(IOException e2)
245 {
246 Log.ignore(e2);
247 }
248 throw e;
249 }
250 finally
251 {
252 current.setContextClassLoader(oldcontext);
253 if (_endp.isOpen())
254 {
255 _generator.idle();
256 _idle.access(_endp);
257 if (_closedIn && _closedOut && _outbound.isBufferEmpty())
258 _endp.close();
259 else if (_endp.isInputShutdown() && !_closedIn)
260 closeIn(CLOSE_NOCLOSE,null);
261 else
262 checkWriteable();
263 }
264
265 }
266 return this;
267 }
268
269
270 public boolean isIdle()
271 {
272 return _parser.isBufferEmpty() && _outbound.isBufferEmpty();
273 }
274
275
276 @Override
277 public void idleExpired()
278 {
279 long idle = System.currentTimeMillis()-((SelectChannelEndPoint)_endp).getIdleTimestamp();
280 closeOut(WebSocketConnectionD10.CLOSE_NORMAL,"Idle for "+idle+"ms");
281 }
282
283
284 public boolean isSuspended()
285 {
286 return false;
287 }
288
289
290 public void closed()
291 {
292 final boolean closed;
293 synchronized (this)
294 {
295 closed=_closeCode==0;
296 if (closed)
297 _closeCode=WebSocketConnectionD10.CLOSE_NOCLOSE;
298 }
299 if (closed)
300 _webSocket.onClose(WebSocketConnectionD10.CLOSE_NOCLOSE,"closed");
301 }
302
303
304 public void closeIn(int code,String message)
305 {
306 Log.debug("ClosedIn {} {}",this,message);
307
308 final boolean closedOut;
309 final boolean closed;
310 synchronized (this)
311 {
312 closedOut=_closedOut;
313 _closedIn=true;
314 closed=_closeCode==0;
315 if (closed)
316 {
317 _closeCode=code;
318 _closeMessage=message;
319 }
320 }
321
322 try
323 {
324 if (closed)
325 _webSocket.onClose(code,message);
326 }
327 finally
328 {
329 try
330 {
331 if (closedOut)
332 _endp.close();
333 else
334 closeOut(code,message);
335 }
336 catch(IOException e)
337 {
338 Log.ignore(e);
339 }
340 }
341 }
342
343
344 public void closeOut(int code,String message)
345 {
346 Log.debug("ClosedOut {} {}",this,message);
347
348 final boolean close;
349 final boolean closed;
350 synchronized (this)
351 {
352 close=_closedIn || _closedOut;
353 _closedOut=true;
354 closed=_closeCode==0;
355 if (closed)
356 {
357 _closeCode=code;
358 _closeMessage=message;
359 }
360 }
361
362 try
363 {
364 if (closed)
365 _webSocket.onClose(code,message);
366 }
367 finally
368 {
369 try
370 {
371 if (close)
372 _endp.close();
373 else
374 {
375 if (code<=0)
376 code=WebSocketConnectionD10.CLOSE_NORMAL;
377 byte[] bytes = ("xx"+(message==null?"":message)).getBytes(StringUtil.__ISO_8859_1);
378 bytes[0]=(byte)(code/0x100);
379 bytes[1]=(byte)(code%0x100);
380 _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD10.OP_CLOSE,bytes,0,bytes.length);
381 }
382 _outbound.flush();
383
384 }
385 catch(IOException e)
386 {
387 Log.ignore(e);
388 }
389 }
390 }
391
392
393 public void fillBuffersFrom(Buffer buffer)
394 {
395 _parser.fill(buffer);
396 }
397
398
399 private void checkWriteable()
400 {
401 if (!_outbound.isBufferEmpty() && _endp instanceof AsyncEndPoint)
402 {
403 ((AsyncEndPoint)_endp).scheduleWrite();
404 }
405 }
406
407
408
409
410 private class WSFrameConnection implements WebSocket.FrameConnection
411 {
412 volatile boolean _disconnecting;
413 int _maxTextMessage=WebSocketConnectionD10.this._maxTextMessageSize;
414 int _maxBinaryMessage=WebSocketConnectionD10.this._maxBinaryMessageSize;
415
416
417 public void sendMessage(String content) throws IOException
418 {
419 if (_closedOut)
420 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
421 byte[] data = content.getBytes(StringUtil.__UTF8);
422 _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD10.OP_TEXT,data,0,data.length);
423 checkWriteable();
424 _idle.access(_endp);
425 }
426
427
428 public void sendMessage(byte[] content, int offset, int length) throws IOException
429 {
430 if (_closedOut)
431 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
432 _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD10.OP_BINARY,content,offset,length);
433 checkWriteable();
434 _idle.access(_endp);
435 }
436
437
438 public void sendFrame(byte flags,byte opcode, byte[] content, int offset, int length) throws IOException
439 {
440 if (_closedOut)
441 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
442 _outbound.addFrame(flags,opcode,content,offset,length);
443 checkWriteable();
444 _idle.access(_endp);
445 }
446
447
448 public void sendControl(byte ctrl, byte[] data, int offset, int length) throws IOException
449 {
450 if (_closedOut)
451 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
452 _outbound.addFrame((byte)FLAG_FIN,ctrl,data,offset,length);
453 checkWriteable();
454 _idle.access(_endp);
455 }
456
457
458 public boolean isMessageComplete(byte flags)
459 {
460 return isLastFrame(flags);
461 }
462
463
464 public boolean isOpen()
465 {
466 return _endp!=null&&_endp.isOpen();
467 }
468
469
470 public void close(int code, String message)
471 {
472 if (_disconnecting)
473 return;
474 _disconnecting=true;
475 WebSocketConnectionD10.this.closeOut(code,message);
476 }
477
478
479 public void setMaxTextMessageSize(int size)
480 {
481 _maxTextMessage=size;
482 }
483
484
485 public void setMaxBinaryMessageSize(int size)
486 {
487 _maxBinaryMessage=size;
488 }
489
490
491 public int getMaxTextMessageSize()
492 {
493 return _maxTextMessage;
494 }
495
496
497 public int getMaxBinaryMessageSize()
498 {
499 return _maxBinaryMessage;
500 }
501
502
503 public String getProtocol()
504 {
505 return _protocol;
506 }
507
508
509 public byte binaryOpcode()
510 {
511 return OP_BINARY;
512 }
513
514
515 public byte textOpcode()
516 {
517 return OP_TEXT;
518 }
519
520
521 public byte continuationOpcode()
522 {
523 return OP_CONTINUATION;
524 }
525
526
527 public byte finMask()
528 {
529 return FLAG_FIN;
530 }
531
532
533 public boolean isControl(byte opcode)
534 {
535 return isControlFrame(opcode);
536 }
537
538
539 public boolean isText(byte opcode)
540 {
541 return opcode==OP_TEXT;
542 }
543
544
545 public boolean isBinary(byte opcode)
546 {
547 return opcode==OP_BINARY;
548 }
549
550
551 public boolean isContinuation(byte opcode)
552 {
553 return opcode==OP_CONTINUATION;
554 }
555
556
557 public boolean isClose(byte opcode)
558 {
559 return opcode==OP_CLOSE;
560 }
561
562
563 public boolean isPing(byte opcode)
564 {
565 return opcode==OP_PING;
566 }
567
568
569 public boolean isPong(byte opcode)
570 {
571 return opcode==OP_PONG;
572 }
573
574
575 public void disconnect()
576 {
577 close(CLOSE_NORMAL,null);
578 }
579
580
581 public void setFakeFragments(boolean fake)
582 {
583 _parser.setFakeFragments(fake);
584 }
585
586
587 public boolean isFakeFragments()
588 {
589 return _parser.isFakeFragments();
590 }
591
592
593 public String toString()
594 {
595 return this.getClass().getSimpleName()+"@"+_endp.getLocalAddr()+":"+_endp.getLocalPort()+"<->"+_endp.getRemoteAddr()+":"+_endp.getRemotePort();
596 }
597 }
598
599
600
601
602 private class WSFrameHandler implements WebSocketParser.FrameHandler
603 {
604 private final Utf8StringBuilder _utf8 = new Utf8StringBuilder();
605 private ByteArrayBuffer _aggregate;
606 private byte _opcode=-1;
607
608 public void onFrame(final byte flags, final byte opcode, final Buffer buffer)
609 {
610 boolean lastFrame = isLastFrame(flags);
611
612 synchronized(WebSocketConnectionD10.this)
613 {
614
615 if (_closedIn)
616 return;
617 }
618 try
619 {
620 byte[] array=buffer.array();
621
622
623 if (_onFrame!=null)
624 {
625 if (_onFrame.onFrame(flags,opcode,array,buffer.getIndex(),buffer.length()))
626 return;
627 }
628
629 if (_onControl!=null && isControlFrame(opcode))
630 {
631 if (_onControl.onControl(opcode,array,buffer.getIndex(),buffer.length()))
632 return;
633 }
634
635 switch(opcode)
636 {
637 case WebSocketConnectionD10.OP_CONTINUATION:
638 {
639
640 if (_onTextMessage!=null && _opcode==WebSocketConnectionD10.OP_TEXT)
641 {
642 if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
643 {
644
645 if (lastFrame)
646 {
647 _opcode=-1;
648 String msg =_utf8.toString();
649 _utf8.reset();
650 _onTextMessage.onMessage(msg);
651 }
652 }
653 else
654 textMessageTooLarge();
655 }
656
657 if (_opcode>=0 && _connection.getMaxBinaryMessageSize()>=0)
658 {
659 if (checkBinaryMessageSize(_aggregate.length(),buffer.length()))
660 {
661 _aggregate.put(buffer);
662
663
664 if (lastFrame && _onBinaryMessage!=null)
665 {
666 try
667 {
668 _onBinaryMessage.onMessage(_aggregate.array(),_aggregate.getIndex(),_aggregate.length());
669 }
670 finally
671 {
672 _opcode=-1;
673 _aggregate.clear();
674 }
675 }
676 }
677 }
678 break;
679 }
680 case WebSocketConnectionD10.OP_PING:
681 {
682 Log.debug("PING {}",this);
683 if (!_closedOut)
684 _connection.sendControl(WebSocketConnectionD10.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length());
685 break;
686 }
687
688 case WebSocketConnectionD10.OP_PONG:
689 {
690 Log.debug("PONG {}",this);
691 break;
692 }
693
694 case WebSocketConnectionD10.OP_CLOSE:
695 {
696 int code=WebSocketConnectionD10.CLOSE_NOCODE;
697 String message=null;
698 if (buffer.length()>=2)
699 {
700 code=buffer.array()[buffer.getIndex()]*0x100+buffer.array()[buffer.getIndex()+1];
701 if (buffer.length()>2)
702 message=new String(buffer.array(),buffer.getIndex()+2,buffer.length()-2,StringUtil.__UTF8);
703 }
704 closeIn(code,message);
705 break;
706 }
707
708 case WebSocketConnectionD10.OP_TEXT:
709 {
710 if(_onTextMessage!=null)
711 {
712 if (_connection.getMaxTextMessageSize()<=0)
713 {
714
715 if (lastFrame)
716 _onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8));
717 }
718 else if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
719 {
720 if (lastFrame)
721 {
722 String msg =_utf8.toString();
723 _utf8.reset();
724 _onTextMessage.onMessage(msg);
725 }
726 else
727 {
728 _opcode=WebSocketConnectionD10.OP_TEXT;
729 }
730 }
731 else
732 textMessageTooLarge();
733 }
734 break;
735 }
736
737 default:
738 {
739 if (_onBinaryMessage!=null && checkBinaryMessageSize(0,buffer.length()))
740 {
741 if (lastFrame)
742 {
743 _onBinaryMessage.onMessage(array,buffer.getIndex(),buffer.length());
744 }
745 else if (_connection.getMaxBinaryMessageSize()>=0)
746 {
747 _opcode=opcode;
748
749 if (_aggregate==null)
750 _aggregate=new ByteArrayBuffer(_connection.getMaxBinaryMessageSize());
751 _aggregate.put(buffer);
752 }
753 }
754 }
755 }
756 }
757 catch(ThreadDeath th)
758 {
759 throw th;
760 }
761 catch(Throwable th)
762 {
763 Log.warn(th);
764 }
765 }
766
767 private boolean checkBinaryMessageSize(int bufferLen, int length)
768 {
769 int max = _connection.getMaxBinaryMessageSize();
770 if (max>0 && (bufferLen+length)>max)
771 {
772 _connection.close(WebSocketConnectionD10.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
773 _opcode=-1;
774 if (_aggregate!=null)
775 _aggregate.clear();
776 return false;
777 }
778 return true;
779 }
780
781 private void textMessageTooLarge()
782 {
783 _connection.close(WebSocketConnectionD10.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
784
785 _opcode=-1;
786 _utf8.reset();
787 }
788
789 public void close(int code,String message)
790 {
791 if (code!=CLOSE_NORMAL)
792 Log.warn("Close: "+code+" "+message);
793 _connection.close(code,message);
794 }
795
796 public String toString()
797 {
798 return WebSocketConnectionD10.this.toString()+"FH";
799 }
800 }
801
802
803 private interface IdleCheck
804 {
805 void access(EndPoint endp);
806 }
807
808
809 public void handshake(HttpServletRequest request, HttpServletResponse response, String subprotocol) throws IOException
810 {
811 String uri=request.getRequestURI();
812 String query=request.getQueryString();
813 if (query!=null && query.length()>0)
814 uri+="?"+query;
815 String key = request.getHeader("Sec-WebSocket-Key");
816
817 response.setHeader("Upgrade","WebSocket");
818 response.addHeader("Connection","Upgrade");
819 response.addHeader("Sec-WebSocket-Accept",hashKey(key));
820 if (subprotocol!=null)
821 response.addHeader("Sec-WebSocket-Protocol",subprotocol);
822
823 for(Extension ext : _extensions)
824 response.addHeader("Sec-WebSocket-Extensions",ext.getParameterizedName());
825
826 response.sendError(101);
827
828 if (_onFrame!=null)
829 _onFrame.onHandshake(_connection);
830 _webSocket.onOpen(_connection);
831 }
832
833
834 public static String hashKey(String key)
835 {
836 try
837 {
838 MessageDigest md = MessageDigest.getInstance("SHA1");
839 md.update(key.getBytes("UTF-8"));
840 md.update(MAGIC);
841 return new String(B64Code.encode(md.digest()));
842 }
843 catch (Exception e)
844 {
845 throw new RuntimeException(e);
846 }
847 }
848
849
850 public String toString()
851 {
852 return "WS/D"+_draft+"-"+_endp;
853 }
854 }