1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.websocket;
15
16 import java.io.IOException;
17 import java.io.UnsupportedEncodingException;
18 import java.security.MessageDigest;
19 import java.util.Collections;
20 import java.util.List;
21 import javax.servlet.http.HttpServletRequest;
22 import javax.servlet.http.HttpServletResponse;
23
24 import org.eclipse.jetty.io.AbstractConnection;
25 import org.eclipse.jetty.io.AsyncEndPoint;
26 import org.eclipse.jetty.io.Buffer;
27 import org.eclipse.jetty.io.ByteArrayBuffer;
28 import org.eclipse.jetty.io.Connection;
29 import org.eclipse.jetty.io.EndPoint;
30 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
31 import org.eclipse.jetty.util.B64Code;
32 import org.eclipse.jetty.util.StringUtil;
33 import org.eclipse.jetty.util.Utf8StringBuilder;
34 import org.eclipse.jetty.util.log.Log;
35 import org.eclipse.jetty.util.log.Logger;
36 import org.eclipse.jetty.websocket.WebSocket.OnBinaryMessage;
37 import org.eclipse.jetty.websocket.WebSocket.OnControl;
38 import org.eclipse.jetty.websocket.WebSocket.OnFrame;
39 import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
40
41 public class WebSocketConnectionD08 extends AbstractConnection implements WebSocketConnection
42 {
43 private static final Logger LOG = Log.getLogger(WebSocketConnectionD08.class);
44
45 final static byte OP_CONTINUATION = 0x00;
46 final static byte OP_TEXT = 0x01;
47 final static byte OP_BINARY = 0x02;
48 final static byte OP_EXT_DATA = 0x03;
49
50 final static byte OP_CONTROL = 0x08;
51 final static byte OP_CLOSE = 0x08;
52 final static byte OP_PING = 0x09;
53 final static byte OP_PONG = 0x0A;
54 final static byte OP_EXT_CTRL = 0x0B;
55
56 final static int CLOSE_NORMAL=1000;
57 final static int CLOSE_SHUTDOWN=1001;
58 final static int CLOSE_PROTOCOL=1002;
59 final static int CLOSE_BADDATA=1003;
60 final static int CLOSE_NOCODE=1005;
61 final static int CLOSE_NOCLOSE=1006;
62 final static int CLOSE_NOTUTF8=1007;
63
64 final static int FLAG_FIN=0x8;
65
66 final static int VERSION=8;
67
68 static boolean isLastFrame(byte flags)
69 {
70 return (flags&FLAG_FIN)!=0;
71 }
72
73 static boolean isControlFrame(byte opcode)
74 {
75 return (opcode&OP_CONTROL)!=0;
76 }
77
78 private final static byte[] MAGIC;
79 private final IdleCheck _idle;
80 private final List<Extension> _extensions;
81 private final WebSocketParserD08 _parser;
82 private final WebSocketParser.FrameHandler _inbound;
83 private final WebSocketGeneratorD08 _generator;
84 private final WebSocketGenerator _outbound;
85 private final WebSocket _webSocket;
86 private final OnFrame _onFrame;
87 private final OnBinaryMessage _onBinaryMessage;
88 private final OnTextMessage _onTextMessage;
89 private final OnControl _onControl;
90 private final String _protocol;
91 private final int _draft;
92 private final ClassLoader _context;
93 private volatile int _closeCode;
94 private volatile String _closeMessage;
95 private volatile boolean _closedIn;
96 private volatile boolean _closedOut;
97 private int _maxTextMessageSize=-1;
98 private int _maxBinaryMessageSize=-1;
99
100 static
101 {
102 try
103 {
104 MAGIC="258EAFA5-E914-47DA-95CA-C5AB0DC85B11".getBytes(StringUtil.__ISO_8859_1);
105 }
106 catch (UnsupportedEncodingException e)
107 {
108 throw new RuntimeException(e);
109 }
110 }
111
112 private final WebSocketParser.FrameHandler _frameHandler= new WSFrameHandler();
113
114 private final WebSocket.FrameConnection _connection = new WSFrameConnection();
115
116
117
118 public WebSocketConnectionD08(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol, List<Extension> extensions,int draft)
119 throws IOException
120 {
121 this(websocket,endpoint,buffers,timestamp,maxIdleTime,protocol,extensions,draft,null);
122 }
123
124
125 public WebSocketConnectionD08(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol, List<Extension> extensions,int draft, MaskGen maskgen)
126 throws IOException
127 {
128 super(endpoint,timestamp);
129
130 _context=Thread.currentThread().getContextClassLoader();
131
132 if (endpoint instanceof AsyncEndPoint)
133 ((AsyncEndPoint)endpoint).cancelIdle();
134
135 _draft=draft;
136 _endp.setMaxIdleTime(maxIdleTime);
137
138 _webSocket = websocket;
139 _onFrame=_webSocket instanceof OnFrame ? (OnFrame)_webSocket : null;
140 _onTextMessage=_webSocket instanceof OnTextMessage ? (OnTextMessage)_webSocket : null;
141 _onBinaryMessage=_webSocket instanceof OnBinaryMessage ? (OnBinaryMessage)_webSocket : null;
142 _onControl=_webSocket instanceof OnControl ? (OnControl)_webSocket : null;
143 _generator = new WebSocketGeneratorD08(buffers, _endp,maskgen);
144
145 _extensions=extensions;
146 if (_extensions!=null)
147 {
148 int e=0;
149 for (Extension extension : _extensions)
150 {
151 extension.bind(
152 _connection,
153 e==extensions.size()-1?_frameHandler:extensions.get(e+1),
154 e==0?_generator:extensions.get(e-1));
155 e++;
156 }
157 }
158
159 _outbound=(_extensions==null||_extensions.size()==0)?_generator:extensions.get(extensions.size()-1);
160 _inbound=(_extensions==null||_extensions.size()==0)?_frameHandler:extensions.get(0);
161
162 _parser = new WebSocketParserD08(buffers, endpoint,_inbound,maskgen==null);
163
164 _protocol=protocol;
165
166 if (_endp instanceof SelectChannelEndPoint)
167 {
168 final SelectChannelEndPoint scep=(SelectChannelEndPoint)_endp;
169 scep.cancelIdle();
170 _idle=new IdleCheck()
171 {
172 public void access(EndPoint endp)
173 {
174 scep.scheduleIdle();
175 }
176 };
177 scep.scheduleIdle();
178 }
179 else
180 {
181 _idle = new IdleCheck()
182 {
183 public void access(EndPoint endp)
184 {}
185 };
186 }
187 }
188
189
190 public WebSocket.Connection getConnection()
191 {
192 return _connection;
193 }
194
195
196 public List<Extension> getExtensions()
197 {
198 if (_extensions==null)
199 return Collections.emptyList();
200
201 return _extensions;
202 }
203
204
205 public Connection handle() throws IOException
206 {
207 Thread current = Thread.currentThread();
208 ClassLoader oldcontext = current.getContextClassLoader();
209 current.setContextClassLoader(_context);
210 try
211 {
212
213 boolean progress=true;
214
215 while (progress)
216 {
217 int flushed=_generator.flushBuffer();
218 int filled=_parser.parseNext();
219
220 progress = flushed>0 || filled>0;
221
222 if (filled<0 || flushed<0)
223 {
224 _endp.close();
225 break;
226 }
227 }
228 }
229 catch(IOException e)
230 {
231 try
232 {
233 _endp.close();
234 }
235 catch(IOException e2)
236 {
237 LOG.ignore(e2);
238 }
239 throw e;
240 }
241 finally
242 {
243 current.setContextClassLoader(oldcontext);
244 _parser.returnBuffer();
245 _generator.returnBuffer();
246 if (_endp.isOpen())
247 {
248 _idle.access(_endp);
249 if (_closedIn && _closedOut && _outbound.isBufferEmpty())
250 _endp.close();
251 else if (_endp.isInputShutdown() && !_closedIn)
252 closeIn(CLOSE_NOCLOSE,null);
253 else
254 checkWriteable();
255 }
256 }
257 return this;
258 }
259
260
261 public boolean isIdle()
262 {
263 return _parser.isBufferEmpty() && _outbound.isBufferEmpty();
264 }
265
266
267 @Override
268 public void idleExpired()
269 {
270 long idle = System.currentTimeMillis()-((SelectChannelEndPoint)_endp).getIdleTimestamp();
271 closeOut(WebSocketConnectionD08.CLOSE_NORMAL,"Idle for "+idle+"ms > "+_endp.getMaxIdleTime()+"ms");
272 }
273
274
275 public boolean isSuspended()
276 {
277 return false;
278 }
279
280
281 public void closed()
282 {
283 final boolean closed;
284 synchronized (this)
285 {
286 closed=_closeCode==0;
287 if (closed)
288 _closeCode=WebSocketConnectionD08.CLOSE_NOCLOSE;
289 }
290 if (closed)
291 _webSocket.onClose(WebSocketConnectionD08.CLOSE_NOCLOSE,"closed");
292 }
293
294
295 public void closeIn(int code,String message)
296 {
297 LOG.debug("ClosedIn {} {}",this,message);
298
299 final boolean closedOut;
300 final boolean closed;
301 synchronized (this)
302 {
303 closedOut=_closedOut;
304 _closedIn=true;
305 closed=_closeCode==0;
306 if (closed)
307 {
308 _closeCode=code;
309 _closeMessage=message;
310 }
311 }
312
313 try
314 {
315 if (closed)
316 _webSocket.onClose(code,message);
317 }
318 finally
319 {
320 try
321 {
322 if (closedOut)
323 _endp.close();
324 else
325 closeOut(code,message);
326 }
327 catch(IOException e)
328 {
329 LOG.ignore(e);
330 }
331 }
332 }
333
334
335 public void closeOut(int code,String message)
336 {
337 LOG.debug("ClosedOut {} {}",this,message);
338
339 final boolean close;
340 final boolean closed;
341 synchronized (this)
342 {
343 close=_closedIn || _closedOut;
344 _closedOut=true;
345 closed=_closeCode==0;
346 if (closed)
347 {
348 _closeCode=code;
349 _closeMessage=message;
350 }
351 }
352
353 try
354 {
355 if (closed)
356 _webSocket.onClose(code,message);
357 }
358 finally
359 {
360 try
361 {
362 if (close)
363 _endp.close();
364 else
365 {
366 if (code<=0)
367 code=WebSocketConnectionD08.CLOSE_NORMAL;
368 byte[] bytes = ("xx"+(message==null?"":message)).getBytes(StringUtil.__ISO_8859_1);
369 bytes[0]=(byte)(code/0x100);
370 bytes[1]=(byte)(code%0x100);
371 _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD08.OP_CLOSE,bytes,0,bytes.length);
372 }
373 _outbound.flush();
374
375 }
376 catch(IOException e)
377 {
378 LOG.ignore(e);
379 }
380 }
381 }
382
383
384 public void fillBuffersFrom(Buffer buffer)
385 {
386 _parser.fill(buffer);
387 }
388
389
390 private void checkWriteable()
391 {
392 if (!_outbound.isBufferEmpty() && _endp instanceof AsyncEndPoint)
393 {
394 ((AsyncEndPoint)_endp).scheduleWrite();
395 }
396 }
397
398
399
400
401 private class WSFrameConnection implements WebSocket.FrameConnection
402 {
403 volatile boolean _disconnecting;
404
405
406 public void sendMessage(String content) throws IOException
407 {
408 if (_closedOut)
409 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
410 byte[] data = content.getBytes(StringUtil.__UTF8);
411 _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD08.OP_TEXT,data,0,data.length);
412 checkWriteable();
413 _idle.access(_endp);
414 }
415
416
417 public void sendMessage(byte[] content, int offset, int length) throws IOException
418 {
419 if (_closedOut)
420 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
421 _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD08.OP_BINARY,content,offset,length);
422 checkWriteable();
423 _idle.access(_endp);
424 }
425
426
427 public void sendFrame(byte flags,byte opcode, byte[] content, int offset, int length) throws IOException
428 {
429 if (_closedOut)
430 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
431 _outbound.addFrame(flags,opcode,content,offset,length);
432 checkWriteable();
433 _idle.access(_endp);
434 }
435
436
437 public void sendControl(byte ctrl, byte[] data, int offset, int length) throws IOException
438 {
439 if (_closedOut)
440 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
441 _outbound.addFrame((byte)FLAG_FIN,ctrl,data,offset,length);
442 checkWriteable();
443 _idle.access(_endp);
444 }
445
446
447 public boolean isMessageComplete(byte flags)
448 {
449 return isLastFrame(flags);
450 }
451
452
453 public boolean isOpen()
454 {
455 return _endp!=null&&_endp.isOpen();
456 }
457
458
459 public void close(int code, String message)
460 {
461 if (_disconnecting)
462 return;
463 _disconnecting=true;
464 WebSocketConnectionD08.this.closeOut(code,message);
465 }
466
467
468 public void setMaxIdleTime(int ms)
469 {
470 try
471 {
472 _endp.setMaxIdleTime(ms);
473 }
474 catch(IOException e)
475 {
476 LOG.warn(e);
477 }
478 }
479
480
481 public void setMaxTextMessageSize(int size)
482 {
483 _maxTextMessageSize=size;
484 }
485
486
487 public void setMaxBinaryMessageSize(int size)
488 {
489 _maxBinaryMessageSize=size;
490 }
491
492
493 public int getMaxIdleTime()
494 {
495 return _endp.getMaxIdleTime();
496 }
497
498
499 public int getMaxTextMessageSize()
500 {
501 return _maxTextMessageSize;
502 }
503
504
505 public int getMaxBinaryMessageSize()
506 {
507 return _maxBinaryMessageSize;
508 }
509
510
511 public String getProtocol()
512 {
513 return _protocol;
514 }
515
516
517 public byte binaryOpcode()
518 {
519 return OP_BINARY;
520 }
521
522
523 public byte textOpcode()
524 {
525 return OP_TEXT;
526 }
527
528
529 public byte continuationOpcode()
530 {
531 return OP_CONTINUATION;
532 }
533
534
535 public byte finMask()
536 {
537 return FLAG_FIN;
538 }
539
540
541 public boolean isControl(byte opcode)
542 {
543 return isControlFrame(opcode);
544 }
545
546
547 public boolean isText(byte opcode)
548 {
549 return opcode==OP_TEXT;
550 }
551
552
553 public boolean isBinary(byte opcode)
554 {
555 return opcode==OP_BINARY;
556 }
557
558
559 public boolean isContinuation(byte opcode)
560 {
561 return opcode==OP_CONTINUATION;
562 }
563
564
565 public boolean isClose(byte opcode)
566 {
567 return opcode==OP_CLOSE;
568 }
569
570
571 public boolean isPing(byte opcode)
572 {
573 return opcode==OP_PING;
574 }
575
576
577 public boolean isPong(byte opcode)
578 {
579 return opcode==OP_PONG;
580 }
581
582
583 public void disconnect()
584 {
585 close(CLOSE_NORMAL,null);
586 }
587
588
589 public void setAllowFrameFragmentation(boolean allowFragmentation)
590 {
591 _parser.setFakeFragments(allowFragmentation);
592 }
593
594
595 public boolean isAllowFrameFragmentation()
596 {
597 return _parser.isFakeFragments();
598 }
599
600
601 @Override
602 public String toString()
603 {
604 return this.getClass().getSimpleName()+"D08@"+_endp.getLocalAddr()+":"+_endp.getLocalPort()+"<->"+_endp.getRemoteAddr()+":"+_endp.getRemotePort();
605 }
606 }
607
608
609
610
611 private class WSFrameHandler implements WebSocketParser.FrameHandler
612 {
613 private final Utf8StringBuilder _utf8 = new Utf8StringBuilder();
614 private ByteArrayBuffer _aggregate;
615 private byte _opcode=-1;
616
617 public void onFrame(final byte flags, final byte opcode, final Buffer buffer)
618 {
619 boolean lastFrame = isLastFrame(flags);
620
621 synchronized(WebSocketConnectionD08.this)
622 {
623
624 if (_closedIn)
625 return;
626 }
627 try
628 {
629 byte[] array=buffer.array();
630
631
632 if (_onFrame!=null)
633 {
634 if (_onFrame.onFrame(flags,opcode,array,buffer.getIndex(),buffer.length()))
635 return;
636 }
637
638 if (_onControl!=null && isControlFrame(opcode))
639 {
640 if (_onControl.onControl(opcode,array,buffer.getIndex(),buffer.length()))
641 return;
642 }
643
644 switch(opcode)
645 {
646 case WebSocketConnectionD08.OP_CONTINUATION:
647 {
648
649 if (_onTextMessage!=null && _opcode==WebSocketConnectionD08.OP_TEXT)
650 {
651 if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
652 {
653
654 if (lastFrame)
655 {
656 _opcode=-1;
657 String msg =_utf8.toString();
658 _utf8.reset();
659 _onTextMessage.onMessage(msg);
660 }
661 }
662 else
663 textMessageTooLarge();
664 }
665
666 if (_opcode>=0 && _connection.getMaxBinaryMessageSize()>=0)
667 {
668 if (checkBinaryMessageSize(_aggregate.length(),buffer.length()))
669 {
670 _aggregate.put(buffer);
671
672
673 if (lastFrame && _onBinaryMessage!=null)
674 {
675 try
676 {
677 _onBinaryMessage.onMessage(_aggregate.array(),_aggregate.getIndex(),_aggregate.length());
678 }
679 finally
680 {
681 _opcode=-1;
682 _aggregate.clear();
683 }
684 }
685 }
686 }
687 break;
688 }
689 case WebSocketConnectionD08.OP_PING:
690 {
691 LOG.debug("PING {}",this);
692 if (!_closedOut)
693 _connection.sendControl(WebSocketConnectionD08.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length());
694 break;
695 }
696
697 case WebSocketConnectionD08.OP_PONG:
698 {
699 LOG.debug("PONG {}",this);
700 break;
701 }
702
703 case WebSocketConnectionD08.OP_CLOSE:
704 {
705 int code=WebSocketConnectionD08.CLOSE_NOCODE;
706 String message=null;
707 if (buffer.length()>=2)
708 {
709 code=buffer.array()[buffer.getIndex()]*0x100+buffer.array()[buffer.getIndex()+1];
710 if (buffer.length()>2)
711 message=new String(buffer.array(),buffer.getIndex()+2,buffer.length()-2,StringUtil.__UTF8);
712 }
713 closeIn(code,message);
714 break;
715 }
716
717 case WebSocketConnectionD08.OP_TEXT:
718 {
719 if(_onTextMessage!=null)
720 {
721 if (_connection.getMaxTextMessageSize()<=0)
722 {
723
724 if (lastFrame)
725 _onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8));
726 else
727 {
728 LOG.warn("Frame discarded. Text aggregation disabled for {}",_endp);
729 _connection.close(WebSocketConnectionD08.CLOSE_BADDATA,"Text frame aggregation disabled");
730 }
731 }
732
733 else if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
734 {
735 if (lastFrame)
736 {
737 String msg =_utf8.toString();
738 _utf8.reset();
739 _onTextMessage.onMessage(msg);
740 }
741 else
742 {
743 _opcode=WebSocketConnectionD08.OP_TEXT;
744 }
745 }
746 else
747 textMessageTooLarge();
748 }
749 break;
750 }
751
752 default:
753 {
754 if (_onBinaryMessage!=null && checkBinaryMessageSize(0,buffer.length()))
755 {
756 if (lastFrame)
757 {
758 _onBinaryMessage.onMessage(array,buffer.getIndex(),buffer.length());
759 }
760 else if (_connection.getMaxBinaryMessageSize()>=0)
761 {
762 _opcode=opcode;
763 if (_aggregate==null)
764 _aggregate=new ByteArrayBuffer(_connection.getMaxBinaryMessageSize());
765 _aggregate.put(buffer);
766 }
767 else
768 {
769 LOG.warn("Frame discarded. Binary aggregation disabed for {}",_endp);
770 _connection.close(WebSocketConnectionD08.CLOSE_BADDATA,"Binary frame aggregation disabled");
771 }
772 }
773 }
774 }
775 }
776 catch(ThreadDeath th)
777 {
778 throw th;
779 }
780 catch(Throwable th)
781 {
782 LOG.warn(th);
783 }
784 }
785
786 private boolean checkBinaryMessageSize(int bufferLen, int length)
787 {
788 int max = _connection.getMaxBinaryMessageSize();
789 if (max>0 && (bufferLen+length)>max)
790 {
791 LOG.warn("Binary message too large > {}B for {}",_connection.getMaxBinaryMessageSize(),_endp);
792 _connection.close(WebSocketConnectionD08.CLOSE_BADDATA,"Message size > "+_connection.getMaxBinaryMessageSize());
793 _opcode=-1;
794 if (_aggregate!=null)
795 _aggregate.clear();
796 return false;
797 }
798 return true;
799 }
800
801 private void textMessageTooLarge()
802 {
803 LOG.warn("Text message too large > {} chars for {}",_connection.getMaxTextMessageSize(),_endp);
804 _connection.close(WebSocketConnectionD08.CLOSE_BADDATA,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
805
806 _opcode=-1;
807 _utf8.reset();
808 }
809
810 public void close(int code,String message)
811 {
812 if (code!=CLOSE_NORMAL)
813 LOG.warn("Close: "+code+" "+message);
814 _connection.close(code,message);
815 }
816
817 @Override
818 public String toString()
819 {
820 return WebSocketConnectionD08.this.toString()+"FH";
821 }
822 }
823
824
825 private interface IdleCheck
826 {
827 void access(EndPoint endp);
828 }
829
830
831 public void handshake(HttpServletRequest request, HttpServletResponse response, String subprotocol) throws IOException
832 {
833 String key = request.getHeader("Sec-WebSocket-Key");
834
835 response.setHeader("Upgrade", "WebSocket");
836 response.addHeader("Connection","Upgrade");
837 response.addHeader("Sec-WebSocket-Accept",hashKey(key));
838 if (subprotocol!=null)
839 response.addHeader("Sec-WebSocket-Protocol",subprotocol);
840
841 for(Extension ext : _extensions)
842 response.addHeader("Sec-WebSocket-Extensions",ext.getParameterizedName());
843
844 response.sendError(101);
845
846 if (_onFrame!=null)
847 _onFrame.onHandshake(_connection);
848 _webSocket.onOpen(_connection);
849 }
850
851
852 public static String hashKey(String key)
853 {
854 try
855 {
856 MessageDigest md = MessageDigest.getInstance("SHA1");
857 md.update(key.getBytes("UTF-8"));
858 md.update(MAGIC);
859 return new String(B64Code.encode(md.digest()));
860 }
861 catch (Exception e)
862 {
863 throw new RuntimeException(e);
864 }
865 }
866
867
868 @Override
869 public String toString()
870 {
871 return "WS/D"+_draft+"-"+_endp;
872 }
873 }