1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.websocket;
15
16 import java.io.IOException;
17 import java.io.UnsupportedEncodingException;
18 import java.security.MessageDigest;
19 import java.util.Collections;
20 import java.util.List;
21
22 import javax.servlet.http.HttpServletRequest;
23 import javax.servlet.http.HttpServletResponse;
24
25 import org.eclipse.jetty.io.AbstractConnection;
26 import org.eclipse.jetty.io.AsyncEndPoint;
27 import org.eclipse.jetty.io.Buffer;
28 import org.eclipse.jetty.io.ByteArrayBuffer;
29 import org.eclipse.jetty.io.Connection;
30 import org.eclipse.jetty.io.EndPoint;
31 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
32 import org.eclipse.jetty.util.B64Code;
33 import org.eclipse.jetty.util.StringUtil;
34 import org.eclipse.jetty.util.Utf8StringBuilder;
35 import org.eclipse.jetty.util.log.Log;
36 import org.eclipse.jetty.util.log.Logger;
37 import org.eclipse.jetty.websocket.WebSocket.OnBinaryMessage;
38 import org.eclipse.jetty.websocket.WebSocket.OnControl;
39 import org.eclipse.jetty.websocket.WebSocket.OnFrame;
40 import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
41
42 public class WebSocketConnectionD08 extends AbstractConnection implements WebSocketConnection
43 {
44 private static final Logger LOG = Log.getLogger(WebSocketConnectionD08.class);
45
46 final static byte OP_CONTINUATION = 0x00;
47 final static byte OP_TEXT = 0x01;
48 final static byte OP_BINARY = 0x02;
49 final static byte OP_EXT_DATA = 0x03;
50
51 final static byte OP_CONTROL = 0x08;
52 final static byte OP_CLOSE = 0x08;
53 final static byte OP_PING = 0x09;
54 final static byte OP_PONG = 0x0A;
55 final static byte OP_EXT_CTRL = 0x0B;
56
57 final static int CLOSE_NORMAL=1000;
58 final static int CLOSE_SHUTDOWN=1001;
59 final static int CLOSE_PROTOCOL=1002;
60 final static int CLOSE_BADDATA=1003;
61 final static int CLOSE_NOCODE=1005;
62 final static int CLOSE_NOCLOSE=1006;
63 final static int CLOSE_NOTUTF8=1007;
64
65 final static int FLAG_FIN=0x8;
66
67 final static int VERSION=8;
68
69 static boolean isLastFrame(byte flags)
70 {
71 return (flags&FLAG_FIN)!=0;
72 }
73
74 static boolean isControlFrame(byte opcode)
75 {
76 return (opcode&OP_CONTROL)!=0;
77 }
78
79 private final static byte[] MAGIC;
80 private final List<Extension> _extensions;
81 private final WebSocketParserD08 _parser;
82 private final WebSocketParser.FrameHandler _inbound;
83 private final WebSocketGeneratorD08 _generator;
84 private final WebSocketGenerator _outbound;
85 private final WebSocket _webSocket;
86 private final OnFrame _onFrame;
87 private final OnBinaryMessage _onBinaryMessage;
88 private final OnTextMessage _onTextMessage;
89 private final OnControl _onControl;
90 private final String _protocol;
91 private final int _draft;
92 private final ClassLoader _context;
93 private volatile int _closeCode;
94 private volatile String _closeMessage;
95 private volatile boolean _closedIn;
96 private volatile boolean _closedOut;
97 private int _maxTextMessageSize=-1;
98 private int _maxBinaryMessageSize=-1;
99
100 static
101 {
102 try
103 {
104 MAGIC="258EAFA5-E914-47DA-95CA-C5AB0DC85B11".getBytes(StringUtil.__ISO_8859_1);
105 }
106 catch (UnsupportedEncodingException e)
107 {
108 throw new RuntimeException(e);
109 }
110 }
111
112 private final WebSocketParser.FrameHandler _frameHandler= new WSFrameHandler();
113
114 private final WebSocket.FrameConnection _connection = new WSFrameConnection();
115
116
117
118 public WebSocketConnectionD08(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol, List<Extension> extensions,int draft)
119 throws IOException
120 {
121 this(websocket,endpoint,buffers,timestamp,maxIdleTime,protocol,extensions,draft,null);
122 }
123
124
125 public WebSocketConnectionD08(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol, List<Extension> extensions,int draft, MaskGen maskgen)
126 throws IOException
127 {
128 super(endpoint,timestamp);
129
130 _context=Thread.currentThread().getContextClassLoader();
131
132 _draft=draft;
133 _endp.setMaxIdleTime(maxIdleTime);
134
135 _webSocket = websocket;
136 _onFrame=_webSocket instanceof OnFrame ? (OnFrame)_webSocket : null;
137 _onTextMessage=_webSocket instanceof OnTextMessage ? (OnTextMessage)_webSocket : null;
138 _onBinaryMessage=_webSocket instanceof OnBinaryMessage ? (OnBinaryMessage)_webSocket : null;
139 _onControl=_webSocket instanceof OnControl ? (OnControl)_webSocket : null;
140 _generator = new WebSocketGeneratorD08(buffers, _endp,maskgen);
141
142 _extensions=extensions;
143 if (_extensions!=null)
144 {
145 int e=0;
146 for (Extension extension : _extensions)
147 {
148 extension.bind(
149 _connection,
150 e==extensions.size()-1?_frameHandler:extensions.get(e+1),
151 e==0?_generator:extensions.get(e-1));
152 e++;
153 }
154 }
155
156 _outbound=(_extensions==null||_extensions.size()==0)?_generator:extensions.get(extensions.size()-1);
157 _inbound=(_extensions==null||_extensions.size()==0)?_frameHandler:extensions.get(0);
158
159 _parser = new WebSocketParserD08(buffers, endpoint,_inbound,maskgen==null);
160
161 _protocol=protocol;
162
163 }
164
165
166 public WebSocket.Connection getConnection()
167 {
168 return _connection;
169 }
170
171
172 public List<Extension> getExtensions()
173 {
174 if (_extensions==null)
175 return Collections.emptyList();
176
177 return _extensions;
178 }
179
180
181 public Connection handle() throws IOException
182 {
183 Thread current = Thread.currentThread();
184 ClassLoader oldcontext = current.getContextClassLoader();
185 current.setContextClassLoader(_context);
186 try
187 {
188
189 boolean progress=true;
190
191 while (progress)
192 {
193 int flushed=_generator.flushBuffer();
194 int filled=_parser.parseNext();
195
196 progress = flushed>0 || filled>0;
197
198 if (filled<0 || flushed<0)
199 {
200 _endp.close();
201 break;
202 }
203 }
204 }
205 catch(IOException e)
206 {
207 try
208 {
209 _endp.close();
210 }
211 catch(IOException e2)
212 {
213 LOG.ignore(e2);
214 }
215 throw e;
216 }
217 finally
218 {
219 current.setContextClassLoader(oldcontext);
220 _parser.returnBuffer();
221 _generator.returnBuffer();
222 if (_endp.isOpen())
223 {
224 if (_closedIn && _closedOut && _outbound.isBufferEmpty())
225 _endp.close();
226 else if (_endp.isInputShutdown() && !_closedIn)
227 closeIn(CLOSE_NOCLOSE,null);
228 else
229 checkWriteable();
230 }
231 }
232 return this;
233 }
234
235
236 public void onInputShutdown() throws IOException
237 {
238
239 }
240
241
242 public boolean isIdle()
243 {
244 return _parser.isBufferEmpty() && _outbound.isBufferEmpty();
245 }
246
247
248 @Override
249 public void onIdleExpired()
250 {
251 long idle = System.currentTimeMillis()-((SelectChannelEndPoint)_endp).getIdleTimestamp();
252 closeOut(WebSocketConnectionD08.CLOSE_NORMAL,"Idle for "+idle+"ms > "+_endp.getMaxIdleTime()+"ms");
253 }
254
255
256 public boolean isSuspended()
257 {
258 return false;
259 }
260
261
262 public void onClose()
263 {
264 final boolean closed;
265 synchronized (this)
266 {
267 closed=_closeCode==0;
268 if (closed)
269 _closeCode=WebSocketConnectionD08.CLOSE_NOCLOSE;
270 }
271 if (closed)
272 _webSocket.onClose(WebSocketConnectionD08.CLOSE_NOCLOSE,"closed");
273 }
274
275
276 public void closeIn(int code,String message)
277 {
278 LOG.debug("ClosedIn {} {}",this,message);
279
280 final boolean closedOut;
281 final boolean closed;
282 synchronized (this)
283 {
284 closedOut=_closedOut;
285 _closedIn=true;
286 closed=_closeCode==0;
287 if (closed)
288 {
289 _closeCode=code;
290 _closeMessage=message;
291 }
292 }
293
294 try
295 {
296 if (closed)
297 _webSocket.onClose(code,message);
298 }
299 finally
300 {
301 try
302 {
303 if (closedOut)
304 _endp.close();
305 else
306 closeOut(code,message);
307 }
308 catch(IOException e)
309 {
310 LOG.ignore(e);
311 }
312 }
313 }
314
315
316 public void closeOut(int code,String message)
317 {
318 LOG.debug("ClosedOut {} {}",this,message);
319
320 final boolean close;
321 final boolean closed;
322 synchronized (this)
323 {
324 close=_closedIn || _closedOut;
325 _closedOut=true;
326 closed=_closeCode==0;
327 if (closed)
328 {
329 _closeCode=code;
330 _closeMessage=message;
331 }
332 }
333
334 try
335 {
336 if (closed)
337 _webSocket.onClose(code,message);
338 }
339 finally
340 {
341 try
342 {
343 if (close)
344 _endp.close();
345 else
346 {
347 if (code<=0)
348 code=WebSocketConnectionD08.CLOSE_NORMAL;
349 byte[] bytes = ("xx"+(message==null?"":message)).getBytes(StringUtil.__ISO_8859_1);
350 bytes[0]=(byte)(code/0x100);
351 bytes[1]=(byte)(code%0x100);
352 _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD08.OP_CLOSE,bytes,0,bytes.length);
353 }
354 _outbound.flush();
355
356 }
357 catch(IOException e)
358 {
359 LOG.ignore(e);
360 }
361 }
362 }
363
364
365 public void fillBuffersFrom(Buffer buffer)
366 {
367 _parser.fill(buffer);
368 }
369
370
371 private void checkWriteable()
372 {
373 if (!_outbound.isBufferEmpty() && _endp instanceof AsyncEndPoint)
374 {
375 ((AsyncEndPoint)_endp).scheduleWrite();
376 }
377 }
378
379
380
381
382 private class WSFrameConnection implements WebSocket.FrameConnection
383 {
384 volatile boolean _disconnecting;
385
386
387 public void sendMessage(String content) throws IOException
388 {
389 if (_closedOut)
390 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
391 byte[] data = content.getBytes(StringUtil.__UTF8);
392 _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD08.OP_TEXT,data,0,data.length);
393 checkWriteable();
394 }
395
396
397 public void sendMessage(byte[] content, int offset, int length) throws IOException
398 {
399 if (_closedOut)
400 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
401 _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD08.OP_BINARY,content,offset,length);
402 checkWriteable();
403 }
404
405
406 public void sendFrame(byte flags,byte opcode, byte[] content, int offset, int length) throws IOException
407 {
408 if (_closedOut)
409 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
410 _outbound.addFrame(flags,opcode,content,offset,length);
411 checkWriteable();
412 }
413
414
415 public void sendControl(byte ctrl, byte[] data, int offset, int length) throws IOException
416 {
417 if (_closedOut)
418 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
419 _outbound.addFrame((byte)FLAG_FIN,ctrl,data,offset,length);
420 checkWriteable();
421 }
422
423
424 public boolean isMessageComplete(byte flags)
425 {
426 return isLastFrame(flags);
427 }
428
429
430 public boolean isOpen()
431 {
432 return _endp!=null&&_endp.isOpen();
433 }
434
435
436 public void close(int code, String message)
437 {
438 if (_disconnecting)
439 return;
440 _disconnecting=true;
441 WebSocketConnectionD08.this.closeOut(code,message);
442 }
443
444
445 public void setMaxIdleTime(int ms)
446 {
447 try
448 {
449 _endp.setMaxIdleTime(ms);
450 }
451 catch(IOException e)
452 {
453 LOG.warn(e);
454 }
455 }
456
457
458 public void setMaxTextMessageSize(int size)
459 {
460 _maxTextMessageSize=size;
461 }
462
463
464 public void setMaxBinaryMessageSize(int size)
465 {
466 _maxBinaryMessageSize=size;
467 }
468
469
470 public int getMaxIdleTime()
471 {
472 return _endp.getMaxIdleTime();
473 }
474
475
476 public int getMaxTextMessageSize()
477 {
478 return _maxTextMessageSize;
479 }
480
481
482 public int getMaxBinaryMessageSize()
483 {
484 return _maxBinaryMessageSize;
485 }
486
487
488 public String getProtocol()
489 {
490 return _protocol;
491 }
492
493
494 public byte binaryOpcode()
495 {
496 return OP_BINARY;
497 }
498
499
500 public byte textOpcode()
501 {
502 return OP_TEXT;
503 }
504
505
506 public byte continuationOpcode()
507 {
508 return OP_CONTINUATION;
509 }
510
511
512 public byte finMask()
513 {
514 return FLAG_FIN;
515 }
516
517
518 public boolean isControl(byte opcode)
519 {
520 return isControlFrame(opcode);
521 }
522
523
524 public boolean isText(byte opcode)
525 {
526 return opcode==OP_TEXT;
527 }
528
529
530 public boolean isBinary(byte opcode)
531 {
532 return opcode==OP_BINARY;
533 }
534
535
536 public boolean isContinuation(byte opcode)
537 {
538 return opcode==OP_CONTINUATION;
539 }
540
541
542 public boolean isClose(byte opcode)
543 {
544 return opcode==OP_CLOSE;
545 }
546
547
548 public boolean isPing(byte opcode)
549 {
550 return opcode==OP_PING;
551 }
552
553
554 public boolean isPong(byte opcode)
555 {
556 return opcode==OP_PONG;
557 }
558
559
560 public void disconnect()
561 {
562 close();
563 }
564
565
566 public void close()
567 {
568 close(CLOSE_NORMAL,null);
569 }
570
571
572 public void setAllowFrameFragmentation(boolean allowFragmentation)
573 {
574 _parser.setFakeFragments(allowFragmentation);
575 }
576
577
578 public boolean isAllowFrameFragmentation()
579 {
580 return _parser.isFakeFragments();
581 }
582
583
584 @Override
585 public String toString()
586 {
587 return this.getClass().getSimpleName()+"D08@"+_endp.getLocalAddr()+":"+_endp.getLocalPort()+"<->"+_endp.getRemoteAddr()+":"+_endp.getRemotePort();
588 }
589 }
590
591
592
593
594 private class WSFrameHandler implements WebSocketParser.FrameHandler
595 {
596 private final Utf8StringBuilder _utf8 = new Utf8StringBuilder();
597 private ByteArrayBuffer _aggregate;
598 private byte _opcode=-1;
599
600 public void onFrame(final byte flags, final byte opcode, final Buffer buffer)
601 {
602 boolean lastFrame = isLastFrame(flags);
603
604 synchronized(WebSocketConnectionD08.this)
605 {
606
607 if (_closedIn)
608 return;
609 }
610 try
611 {
612 byte[] array=buffer.array();
613
614
615 if (_onFrame!=null)
616 {
617 if (_onFrame.onFrame(flags,opcode,array,buffer.getIndex(),buffer.length()))
618 return;
619 }
620
621 if (_onControl!=null && isControlFrame(opcode))
622 {
623 if (_onControl.onControl(opcode,array,buffer.getIndex(),buffer.length()))
624 return;
625 }
626
627 switch(opcode)
628 {
629 case WebSocketConnectionD08.OP_CONTINUATION:
630 {
631
632 if (_onTextMessage!=null && _opcode==WebSocketConnectionD08.OP_TEXT)
633 {
634 if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
635 {
636
637 if (lastFrame)
638 {
639 _opcode=-1;
640 String msg =_utf8.toString();
641 _utf8.reset();
642 _onTextMessage.onMessage(msg);
643 }
644 }
645 else
646 textMessageTooLarge();
647 }
648
649 if (_opcode>=0 && _connection.getMaxBinaryMessageSize()>=0)
650 {
651 if (checkBinaryMessageSize(_aggregate.length(),buffer.length()))
652 {
653 _aggregate.put(buffer);
654
655
656 if (lastFrame && _onBinaryMessage!=null)
657 {
658 try
659 {
660 _onBinaryMessage.onMessage(_aggregate.array(),_aggregate.getIndex(),_aggregate.length());
661 }
662 finally
663 {
664 _opcode=-1;
665 _aggregate.clear();
666 }
667 }
668 }
669 }
670 break;
671 }
672 case WebSocketConnectionD08.OP_PING:
673 {
674 LOG.debug("PING {}",this);
675 if (!_closedOut)
676 _connection.sendControl(WebSocketConnectionD08.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length());
677 break;
678 }
679
680 case WebSocketConnectionD08.OP_PONG:
681 {
682 LOG.debug("PONG {}",this);
683 break;
684 }
685
686 case WebSocketConnectionD08.OP_CLOSE:
687 {
688 int code=WebSocketConnectionD08.CLOSE_NOCODE;
689 String message=null;
690 if (buffer.length()>=2)
691 {
692 code=buffer.array()[buffer.getIndex()]*0x100+buffer.array()[buffer.getIndex()+1];
693 if (buffer.length()>2)
694 message=new String(buffer.array(),buffer.getIndex()+2,buffer.length()-2,StringUtil.__UTF8);
695 }
696 closeIn(code,message);
697 break;
698 }
699
700 case WebSocketConnectionD08.OP_TEXT:
701 {
702 if(_onTextMessage!=null)
703 {
704 if (_connection.getMaxTextMessageSize()<=0)
705 {
706
707 if (lastFrame)
708 _onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8));
709 else
710 {
711 LOG.warn("Frame discarded. Text aggregation disabled for {}",_endp);
712 _connection.close(WebSocketConnectionD08.CLOSE_BADDATA,"Text frame aggregation disabled");
713 }
714 }
715
716 else if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
717 {
718 if (lastFrame)
719 {
720 String msg =_utf8.toString();
721 _utf8.reset();
722 _onTextMessage.onMessage(msg);
723 }
724 else
725 {
726 _opcode=WebSocketConnectionD08.OP_TEXT;
727 }
728 }
729 else
730 textMessageTooLarge();
731 }
732 break;
733 }
734
735 default:
736 {
737 if (_onBinaryMessage!=null && checkBinaryMessageSize(0,buffer.length()))
738 {
739 if (lastFrame)
740 {
741 _onBinaryMessage.onMessage(array,buffer.getIndex(),buffer.length());
742 }
743 else if (_connection.getMaxBinaryMessageSize()>=0)
744 {
745 _opcode=opcode;
746 if (_aggregate==null)
747 _aggregate=new ByteArrayBuffer(_connection.getMaxBinaryMessageSize());
748 _aggregate.put(buffer);
749 }
750 else
751 {
752 LOG.warn("Frame discarded. Binary aggregation disabed for {}",_endp);
753 _connection.close(WebSocketConnectionD08.CLOSE_BADDATA,"Binary frame aggregation disabled");
754 }
755 }
756 }
757 }
758 }
759 catch(Throwable th)
760 {
761 LOG.warn(th);
762 }
763 }
764
765 private boolean checkBinaryMessageSize(int bufferLen, int length)
766 {
767 int max = _connection.getMaxBinaryMessageSize();
768 if (max>0 && (bufferLen+length)>max)
769 {
770 LOG.warn("Binary message too large > {}B for {}",_connection.getMaxBinaryMessageSize(),_endp);
771 _connection.close(WebSocketConnectionD08.CLOSE_BADDATA,"Message size > "+_connection.getMaxBinaryMessageSize());
772 _opcode=-1;
773 if (_aggregate!=null)
774 _aggregate.clear();
775 return false;
776 }
777 return true;
778 }
779
780 private void textMessageTooLarge()
781 {
782 LOG.warn("Text message too large > {} chars for {}",_connection.getMaxTextMessageSize(),_endp);
783 _connection.close(WebSocketConnectionD08.CLOSE_BADDATA,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
784
785 _opcode=-1;
786 _utf8.reset();
787 }
788
789 public void close(int code,String message)
790 {
791 if (code!=CLOSE_NORMAL)
792 LOG.warn("Close: "+code+" "+message);
793 _connection.close(code,message);
794 }
795
796 @Override
797 public String toString()
798 {
799 return WebSocketConnectionD08.this.toString()+"FH";
800 }
801 }
802
803
804 public void handshake(HttpServletRequest request, HttpServletResponse response, String subprotocol) throws IOException
805 {
806 String key = request.getHeader("Sec-WebSocket-Key");
807
808 response.setHeader("Upgrade", "WebSocket");
809 response.addHeader("Connection","Upgrade");
810 response.addHeader("Sec-WebSocket-Accept",hashKey(key));
811 if (subprotocol!=null)
812 response.addHeader("Sec-WebSocket-Protocol",subprotocol);
813
814 for(Extension ext : _extensions)
815 response.addHeader("Sec-WebSocket-Extensions",ext.getParameterizedName());
816
817 response.sendError(101);
818
819 if (_onFrame!=null)
820 _onFrame.onHandshake(_connection);
821 _webSocket.onOpen(_connection);
822 }
823
824
825 public static String hashKey(String key)
826 {
827 try
828 {
829 MessageDigest md = MessageDigest.getInstance("SHA1");
830 md.update(key.getBytes("UTF-8"));
831 md.update(MAGIC);
832 return new String(B64Code.encode(md.digest()));
833 }
834 catch (Exception e)
835 {
836 throw new RuntimeException(e);
837 }
838 }
839
840
841 @Override
842 public String toString()
843 {
844 return "WS/D"+_draft+"-"+_endp;
845 }
846 }