View Javadoc

1   /*******************************************************************************
2    * Copyright (c) 2011 Intalio, Inc.
3    * ======================================================================
4    * All rights reserved. This program and the accompanying materials
5    * are made available under the terms of the Eclipse Public License v1.0
6    * and Apache License v2.0 which accompanies this distribution.
7    *
8    *   The Eclipse Public License is available at
9    *   http://www.eclipse.org/legal/epl-v10.html
10   *
11   *   The Apache License v2.0 is available at
12   *   http://www.opensource.org/licenses/apache2.0.php
13   *
14   * You may elect to redistribute this code under either of these licenses.
15   *******************************************************************************/
16  // ========================================================================
17  // Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
18  // ------------------------------------------------------------------------
19  // All rights reserved. This program and the accompanying materials
20  // are made available under the terms of the Eclipse Public License v1.0
21  // and Apache License v2.0 which accompanies this distribution.
22  // The Eclipse Public License is available at
23  // http://www.eclipse.org/legal/epl-v10.html
24  // The Apache License v2.0 is available at
25  // http://www.opensource.org/licenses/apache2.0.php
26  // You may elect to redistribute this code under either of these licenses.
27  // ========================================================================
28  
29  package org.eclipse.jetty.websocket;
30  
31  import java.io.IOException;
32  import java.io.UnsupportedEncodingException;
33  import java.security.MessageDigest;
34  import java.util.Collections;
35  import java.util.List;
36  
37  import org.eclipse.jetty.io.AbstractConnection;
38  import org.eclipse.jetty.io.AsyncEndPoint;
39  import org.eclipse.jetty.io.Buffer;
40  import org.eclipse.jetty.io.ByteArrayBuffer;
41  import org.eclipse.jetty.io.Connection;
42  import org.eclipse.jetty.io.EndPoint;
43  import org.eclipse.jetty.util.B64Code;
44  import org.eclipse.jetty.util.StringUtil;
45  import org.eclipse.jetty.util.Utf8Appendable;
46  import org.eclipse.jetty.util.Utf8StringBuilder;
47  import org.eclipse.jetty.util.log.Log;
48  import org.eclipse.jetty.util.log.Logger;
49  import org.eclipse.jetty.websocket.WebSocket.OnBinaryMessage;
50  import org.eclipse.jetty.websocket.WebSocket.OnControl;
51  import org.eclipse.jetty.websocket.WebSocket.OnFrame;
52  import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
53  
54  
55  /* ------------------------------------------------------------ */
56  /**
57   * <pre>
58   *    0                   1                   2                   3
59   *    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
60   *   +-+-+-+-+-------+-+-------------+-------------------------------+
61   *   |F|R|R|R| opcode|M| Payload len |    Extended payload length    |
62   *   |I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
63   *   |N|V|V|V|       |S|             |   (if payload len==126/127)   |
64   *   | |1|2|3|       |K|             |                               |
65   *   +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
66   *   |     Extended payload length continued, if payload len == 127  |
67   *   + - - - - - - - - - - - - - - - +-------------------------------+
68   *   |                               |Masking-key, if MASK set to 1  |
69   *   +-------------------------------+-------------------------------+
70   *   | Masking-key (continued)       |          Payload Data         |
71   *   +-------------------------------- - - - - - - - - - - - - - - - +
72   *   :                     Payload Data continued ...                :
73   *   + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
74   *   |                     Payload Data continued ...                |
75   *   +---------------------------------------------------------------+
76   * </pre>
77   */
78  public class WebSocketConnectionRFC6455 extends AbstractConnection implements WebSocketConnection
79  {
80      private static final Logger LOG = Log.getLogger(WebSocketConnectionRFC6455.class);
81  
82      final static byte OP_CONTINUATION = 0x00;
83      final static byte OP_TEXT = 0x01;
84      final static byte OP_BINARY = 0x02;
85      final static byte OP_EXT_DATA = 0x03;
86  
87      final static byte OP_CONTROL = 0x08;
88      final static byte OP_CLOSE = 0x08;
89      final static byte OP_PING = 0x09;
90      final static byte OP_PONG = 0x0A;
91      final static byte OP_EXT_CTRL = 0x0B;
92  
93      final static int CLOSE_NORMAL=1000;
94      final static int CLOSE_SHUTDOWN=1001;
95      final static int CLOSE_PROTOCOL=1002;
96      final static int CLOSE_BAD_DATA=1003;
97      final static int CLOSE_UNDEFINED=1004;
98      final static int CLOSE_NO_CODE=1005;
99      final static int CLOSE_NO_CLOSE=1006;
100     final static int CLOSE_BAD_PAYLOAD=1007;
101     final static int CLOSE_POLICY_VIOLATION=1008;
102     final static int CLOSE_MESSAGE_TOO_LARGE=1009;
103     final static int CLOSE_REQUIRED_EXTENSION=1010;
104     final static int CLOSE_SERVER_ERROR=1011;
105     final static int CLOSE_FAILED_TLS_HANDSHAKE=1015;
106 
107     final static int FLAG_FIN=0x8;
108 
109     // Per RFC 6455, section 1.3 - Opening Handshake - this version is "13"
110     final static int VERSION=13;
111 
112     static boolean isLastFrame(byte flags)
113     {
114         return (flags&FLAG_FIN)!=0;
115     }
116 
117     static boolean isControlFrame(byte opcode)
118     {
119         return (opcode&OP_CONTROL)!=0;
120     }
121 
122     private final static byte[] MAGIC;
123     private final List<Extension> _extensions;
124     private final WebSocketParserD13 _parser;
125     private final WebSocketGeneratorRFC6455 _generator;
126     private final WebSocketGenerator _outbound;
127     private final WebSocket _webSocket;
128     private final OnFrame _onFrame;
129     private final OnBinaryMessage _onBinaryMessage;
130     private final OnTextMessage _onTextMessage;
131     private final OnControl _onControl;
132     private final String _protocol;
133     private final int _draft;
134     private final ClassLoader _context;
135     private volatile int _closeCode;
136     private volatile String _closeMessage;
137     private volatile boolean _closedIn;
138     private volatile boolean _closedOut;
139     private int _maxTextMessageSize=-1;
140     private int _maxBinaryMessageSize=-1;
141 
142     static
143     {
144         try
145         {
146             MAGIC="258EAFA5-E914-47DA-95CA-C5AB0DC85B11".getBytes(StringUtil.__ISO_8859_1);
147         }
148         catch (UnsupportedEncodingException e)
149         {
150             throw new RuntimeException(e);
151         }
152     }
153 
154     private final WebSocket.FrameConnection _connection = new WSFrameConnection();
155 
156 
157     /* ------------------------------------------------------------ */
158     public WebSocketConnectionRFC6455(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol, List<Extension> extensions,int draft)
159         throws IOException
160     {
161         this(websocket,endpoint,buffers,timestamp,maxIdleTime,protocol,extensions,draft,null);
162     }
163 
164     /* ------------------------------------------------------------ */
165     public WebSocketConnectionRFC6455(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol, List<Extension> extensions,int draft, MaskGen maskgen)
166         throws IOException
167     {
168         super(endpoint,timestamp);
169 
170         _context=Thread.currentThread().getContextClassLoader();
171 
172         _draft=draft;
173         _endp.setMaxIdleTime(maxIdleTime);
174 
175         _webSocket = websocket;
176         _onFrame=_webSocket instanceof OnFrame ? (OnFrame)_webSocket : null;
177         _onTextMessage=_webSocket instanceof OnTextMessage ? (OnTextMessage)_webSocket : null;
178         _onBinaryMessage=_webSocket instanceof OnBinaryMessage ? (OnBinaryMessage)_webSocket : null;
179         _onControl=_webSocket instanceof OnControl ? (OnControl)_webSocket : null;
180         _generator = new WebSocketGeneratorRFC6455(buffers, _endp,maskgen);
181 
182         _extensions=extensions;
183         WebSocketParser.FrameHandler frameHandler = new WSFrameHandler();
184         if (_extensions!=null)
185         {
186             int e=0;
187             for (Extension extension : _extensions)
188             {
189                 extension.bind(
190                         _connection,
191                         e==extensions.size()-1? frameHandler :extensions.get(e+1),
192                         e==0?_generator:extensions.get(e-1));
193                 e++;
194             }
195         }
196 
197         _outbound=(_extensions==null||_extensions.size()==0)?_generator:extensions.get(extensions.size()-1);
198         WebSocketParser.FrameHandler inbound = (_extensions == null || _extensions.size() == 0) ? frameHandler : extensions.get(0);
199 
200         _parser = new WebSocketParserD13(buffers, endpoint, inbound,maskgen==null);
201 
202         _protocol=protocol;
203 
204     }
205 
206     /* ------------------------------------------------------------ */
207     public WebSocket.Connection getConnection()
208     {
209         return _connection;
210     }
211 
212     /* ------------------------------------------------------------ */
213     public List<Extension> getExtensions()
214     {
215         if (_extensions==null)
216             return Collections.emptyList();
217 
218         return _extensions;
219     }
220 
221     /* ------------------------------------------------------------ */
222     public Connection handle() throws IOException
223     {
224         Thread current = Thread.currentThread();
225         ClassLoader oldcontext = current.getContextClassLoader();
226         current.setContextClassLoader(_context);
227         try
228         {
229             // handle the framing protocol
230             boolean progress=true;
231 
232             while (progress)
233             {
234                 int flushed=_generator.flushBuffer();
235                 int filled=_parser.parseNext();
236 
237                 progress = flushed>0 || filled>0;
238                 _endp.flush();
239 
240                 if (_endp instanceof AsyncEndPoint && ((AsyncEndPoint)_endp).hasProgressed())
241                     progress=true;
242             }
243         }
244         catch(IOException e)
245         {
246             try
247             {
248                 if (_endp.isOpen())
249                     _endp.close();
250             }
251             catch(IOException e2)
252             {
253                 LOG.ignore(e2);
254             }
255             throw e;
256         }
257         finally
258         {
259             current.setContextClassLoader(oldcontext);
260             _parser.returnBuffer();
261             _generator.returnBuffer();
262             if (_endp.isOpen())
263             {
264                 if (_closedIn && _closedOut && _outbound.isBufferEmpty())
265                     _endp.close();
266                 else if (_endp.isInputShutdown() && !_closedIn)
267                     closeIn(CLOSE_NO_CLOSE,null);
268                 else
269                     checkWriteable();
270             }
271         }
272         return this;
273     }
274 
275     /* ------------------------------------------------------------ */
276     public void onInputShutdown() throws IOException
277     {
278         if (!_closedIn)
279             _endp.close();
280     }
281 
282     /* ------------------------------------------------------------ */
283     public boolean isIdle()
284     {
285         return _parser.isBufferEmpty() && _outbound.isBufferEmpty();
286     }
287 
288     /* ------------------------------------------------------------ */
289     @Override
290     public void onIdleExpired(long idleForMs)
291     {
292         closeOut(WebSocketConnectionRFC6455.CLOSE_NORMAL,"Idle for "+idleForMs+"ms > "+_endp.getMaxIdleTime()+"ms");
293     }
294 
295     /* ------------------------------------------------------------ */
296     public boolean isSuspended()
297     {
298         return false;
299     }
300 
301     /* ------------------------------------------------------------ */
302     public void onClose()
303     {
304         final boolean closed;
305         synchronized (this)
306         {
307             closed=_closeCode==0;
308             if (closed)
309                 _closeCode=WebSocketConnectionRFC6455.CLOSE_NO_CLOSE;
310         }
311         if (closed)
312             _webSocket.onClose(WebSocketConnectionRFC6455.CLOSE_NO_CLOSE,"closed");
313     }
314 
315     /* ------------------------------------------------------------ */
316     public void closeIn(int code,String message)
317     {
318         LOG.debug("ClosedIn {} {} {}",this,code,message);
319 
320         final boolean closed_out;
321         final boolean tell_app;
322         synchronized (this)
323         {
324             closed_out=_closedOut;
325             _closedIn=true;
326             tell_app=_closeCode==0;
327             if (tell_app)
328             {
329                 _closeCode=code;
330                 _closeMessage=message;
331             }
332         }
333 
334         try
335         {
336             if (tell_app)
337                 _webSocket.onClose(code,message);
338         }
339         finally
340         {
341             if (!closed_out)
342                 closeOut(code,message);
343         }
344     }
345 
346     /* ------------------------------------------------------------ */
347     public void closeOut(int code,String message)
348     {
349         LOG.debug("ClosedOut {} {} {}",this,code,message);
350 
351         final boolean closed_out;
352         final boolean tell_app;
353         synchronized (this)
354         {
355             closed_out=_closedOut;
356             _closedOut=true;
357             tell_app=_closeCode==0;
358             if (tell_app)
359             {
360                 _closeCode=code;
361                 _closeMessage=message;
362             }
363         }
364 
365         try
366         {
367             if (tell_app)
368                 _webSocket.onClose(code,message);
369         }
370         finally
371         {
372             try
373             {
374                 if (!closed_out)
375                 {
376                     // Close code 1005/1006/1015 are never to be sent as a status over
377                     // a Close control frame. Code<-1 also means no node.
378 
379                     if (code < 0 || (code == WebSocketConnectionRFC6455.CLOSE_NO_CODE) || (code == WebSocketConnectionRFC6455.CLOSE_NO_CLOSE)
380                             || (code == WebSocketConnectionRFC6455.CLOSE_FAILED_TLS_HANDSHAKE))
381                     {
382                         code = -1;
383                     }
384                     else if (code == 0)
385                     {
386                         code = WebSocketConnectionRFC6455.CLOSE_NORMAL;
387                     }
388 
389                     byte[] bytes = ("xx"+(message==null?"":message)).getBytes(StringUtil.__ISO_8859_1);
390                     bytes[0]=(byte)(code/0x100);
391                     bytes[1]=(byte)(code%0x100);
392                     _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionRFC6455.OP_CLOSE,bytes,0,code>0?bytes.length:0);
393                     _outbound.flush();
394                 }
395             }
396             catch(IOException e)
397             {
398                 LOG.ignore(e);
399             }
400         }
401     }
402 
403     public void shutdown()
404     {
405         final WebSocket.Connection connection = _connection;
406         if (connection != null)
407             connection.close(CLOSE_SHUTDOWN, null);
408     }
409 
410     /* ------------------------------------------------------------ */
411     public void fillBuffersFrom(Buffer buffer)
412     {
413         _parser.fill(buffer);
414     }
415 
416     /* ------------------------------------------------------------ */
417     private void checkWriteable()
418     {
419         if (!_outbound.isBufferEmpty() && _endp instanceof AsyncEndPoint)
420         {
421             ((AsyncEndPoint)_endp).scheduleWrite();
422         }
423     }
424 
425     protected void onFrameHandshake()
426     {
427         if (_onFrame != null)
428         {
429             _onFrame.onHandshake(_connection);
430         }
431     }
432 
433     protected void onWebSocketOpen()
434     {
435         _webSocket.onOpen(_connection);
436     }
437 
438     /* ------------------------------------------------------------ */
439     private class WSFrameConnection implements WebSocket.FrameConnection
440     {
441         private volatile boolean _disconnecting;
442 
443         /* ------------------------------------------------------------ */
444         public void sendMessage(String content) throws IOException
445         {
446             if (_closedOut)
447                 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
448             byte[] data = content.getBytes(StringUtil.__UTF8);
449             _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionRFC6455.OP_TEXT,data,0,data.length);
450             checkWriteable();
451         }
452 
453         /* ------------------------------------------------------------ */
454         public void sendMessage(byte[] content, int offset, int length) throws IOException
455         {
456             if (_closedOut)
457                 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
458             _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionRFC6455.OP_BINARY,content,offset,length);
459             checkWriteable();
460         }
461 
462         /* ------------------------------------------------------------ */
463         public void sendFrame(byte flags,byte opcode, byte[] content, int offset, int length) throws IOException
464         {
465             if (_closedOut)
466                 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
467             _outbound.addFrame(flags,opcode,content,offset,length);
468             checkWriteable();
469         }
470 
471         /* ------------------------------------------------------------ */
472         public void sendControl(byte ctrl, byte[] data, int offset, int length) throws IOException
473         {
474             // TODO: section 5.5 states that control frames MUST never be length > 125 bytes and MUST NOT be fragmented
475             if (_closedOut)
476                 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
477             _outbound.addFrame((byte)FLAG_FIN,ctrl,data,offset,length);
478             checkWriteable();
479         }
480 
481         /* ------------------------------------------------------------ */
482         public boolean isMessageComplete(byte flags)
483         {
484             return isLastFrame(flags);
485         }
486 
487         /* ------------------------------------------------------------ */
488         public boolean isOpen()
489         {
490             return _endp!=null&&_endp.isOpen();
491         }
492 
493         /* ------------------------------------------------------------ */
494         public void close(int code, String message)
495         {
496             if (_disconnecting)
497                 return;
498             _disconnecting=true;
499             WebSocketConnectionRFC6455.this.closeOut(code,message);
500         }
501 
502         /* ------------------------------------------------------------ */
503         public void setMaxIdleTime(int ms)
504         {
505             try
506             {
507                 _endp.setMaxIdleTime(ms);
508             }
509             catch(IOException e)
510             {
511                 LOG.warn(e);
512             }
513         }
514 
515         /* ------------------------------------------------------------ */
516         public void setMaxTextMessageSize(int size)
517         {
518             _maxTextMessageSize=size;
519         }
520 
521         /* ------------------------------------------------------------ */
522         public void setMaxBinaryMessageSize(int size)
523         {
524             _maxBinaryMessageSize=size;
525         }
526 
527         /* ------------------------------------------------------------ */
528         public int getMaxIdleTime()
529         {
530             return _endp.getMaxIdleTime();
531         }
532 
533         /* ------------------------------------------------------------ */
534         public int getMaxTextMessageSize()
535         {
536             return _maxTextMessageSize;
537         }
538 
539         /* ------------------------------------------------------------ */
540         public int getMaxBinaryMessageSize()
541         {
542             return _maxBinaryMessageSize;
543         }
544 
545         /* ------------------------------------------------------------ */
546         public String getProtocol()
547         {
548             return _protocol;
549         }
550 
551         /* ------------------------------------------------------------ */
552         public byte binaryOpcode()
553         {
554             return OP_BINARY;
555         }
556 
557         /* ------------------------------------------------------------ */
558         public byte textOpcode()
559         {
560             return OP_TEXT;
561         }
562 
563         /* ------------------------------------------------------------ */
564         public byte continuationOpcode()
565         {
566             return OP_CONTINUATION;
567         }
568 
569         /* ------------------------------------------------------------ */
570         public byte finMask()
571         {
572             return FLAG_FIN;
573         }
574 
575         /* ------------------------------------------------------------ */
576         public boolean isControl(byte opcode)
577         {
578             return isControlFrame(opcode);
579         }
580 
581         /* ------------------------------------------------------------ */
582         public boolean isText(byte opcode)
583         {
584             return opcode==OP_TEXT;
585         }
586 
587         /* ------------------------------------------------------------ */
588         public boolean isBinary(byte opcode)
589         {
590             return opcode==OP_BINARY;
591         }
592 
593         /* ------------------------------------------------------------ */
594         public boolean isContinuation(byte opcode)
595         {
596             return opcode==OP_CONTINUATION;
597         }
598 
599         /* ------------------------------------------------------------ */
600         public boolean isClose(byte opcode)
601         {
602             return opcode==OP_CLOSE;
603         }
604 
605         /* ------------------------------------------------------------ */
606         public boolean isPing(byte opcode)
607         {
608             return opcode==OP_PING;
609         }
610 
611         /* ------------------------------------------------------------ */
612         public boolean isPong(byte opcode)
613         {
614             return opcode==OP_PONG;
615         }
616 
617         /* ------------------------------------------------------------ */
618         public void disconnect()
619         {
620             close(CLOSE_NORMAL,null);
621         }
622 
623         /* ------------------------------------------------------------ */
624         public void close()
625         {
626             close(CLOSE_NORMAL,null);
627         }
628 
629         /* ------------------------------------------------------------ */
630         public void setAllowFrameFragmentation(boolean allowFragmentation)
631         {
632             _parser.setFakeFragments(allowFragmentation);
633         }
634 
635         /* ------------------------------------------------------------ */
636         public boolean isAllowFrameFragmentation()
637         {
638             return _parser.isFakeFragments();
639         }
640 
641         /* ------------------------------------------------------------ */
642         @Override
643         public String toString()
644         {
645             return this.getClass().getSimpleName()+"D13@"+_endp.getLocalAddr()+":"+_endp.getLocalPort()+"<->"+_endp.getRemoteAddr()+":"+_endp.getRemotePort();
646         }
647     }
648 
649     /* ------------------------------------------------------------ */
650     /* ------------------------------------------------------------ */
651     /* ------------------------------------------------------------ */
652     private class WSFrameHandler implements WebSocketParser.FrameHandler
653     {
654         private static final int MAX_CONTROL_FRAME_PAYLOAD = 125;
655         private final Utf8StringBuilder _utf8 = new Utf8StringBuilder(512); // TODO configure initial capacity
656         private ByteArrayBuffer _aggregate;
657         private byte _opcode=-1;
658 
659         public void onFrame(final byte flags, final byte opcode, final Buffer buffer)
660         {
661             boolean lastFrame = isLastFrame(flags);
662 
663             synchronized(WebSocketConnectionRFC6455.this)
664             {
665                 // Ignore incoming after a close
666                 if (_closedIn)
667                     return;
668             }
669             try
670             {
671                 byte[] array=buffer.array();
672 
673                 if (isControlFrame(opcode) && buffer.length()>MAX_CONTROL_FRAME_PAYLOAD)
674                 {
675                     errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"Control frame too large: " + buffer.length() + " > " + MAX_CONTROL_FRAME_PAYLOAD);
676                     return;
677                 }
678 
679                 // TODO: check extensions for RSV bit(s) meanings
680                 if ((flags&0x7)!=0)
681                 {
682                     errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"RSV bits set 0x"+Integer.toHexString(flags));
683                     return;
684                 }
685 
686                 // Ignore all frames after error close
687                 if (_closeCode!=0 && _closeCode!=CLOSE_NORMAL && opcode!=OP_CLOSE)
688                 {
689                     return;
690                 }
691 
692                 // Deliver frame if websocket is a FrameWebSocket
693                 if (_onFrame!=null)
694                 {
695                     if (_onFrame.onFrame(flags,opcode,array,buffer.getIndex(),buffer.length()))
696                         return;
697                 }
698 
699                 if (_onControl!=null && isControlFrame(opcode))
700                 {
701                     if (_onControl.onControl(opcode,array,buffer.getIndex(),buffer.length()))
702                         return;
703                 }
704 
705                 switch(opcode)
706                 {
707                     case WebSocketConnectionRFC6455.OP_CONTINUATION:
708                     {
709                         if (_opcode==-1)
710                         {
711                             errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"Bad Continuation");
712                             return;
713                         }
714 
715                         // If text, append to the message buffer
716                         if (_onTextMessage!=null && _opcode==WebSocketConnectionRFC6455.OP_TEXT)
717                         {
718                             if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
719                             {
720                                 // If this is the last fragment, deliver the text buffer
721                                 if (lastFrame)
722                                 {
723                                     _opcode=-1;
724                                     String msg =_utf8.toString();
725                                     _utf8.reset();
726                                     _onTextMessage.onMessage(msg);
727                                 }
728                             }
729                             else
730                                 textMessageTooLarge();
731                         }
732 
733                         if (_opcode>=0 && _connection.getMaxBinaryMessageSize()>=0)
734                         {
735                             if (_aggregate!=null && checkBinaryMessageSize(_aggregate.length(),buffer.length()))
736                             {
737                                 _aggregate.put(buffer);
738 
739                                 // If this is the last fragment, deliver
740                                 if (lastFrame && _onBinaryMessage!=null)
741                                 {
742                                     try
743                                     {
744                                         _onBinaryMessage.onMessage(_aggregate.array(),_aggregate.getIndex(),_aggregate.length());
745                                     }
746                                     finally
747                                     {
748                                         _opcode=-1;
749                                         _aggregate.clear();
750                                     }
751                                 }
752                             }
753                         }
754                         break;
755                     }
756                     case WebSocketConnectionRFC6455.OP_PING:
757                     {
758                         LOG.debug("PING {}",this);
759                         if (!_closedOut)
760                         {
761                             _connection.sendControl(WebSocketConnectionRFC6455.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length());
762                         }
763                         break;
764                     }
765 
766                     case WebSocketConnectionRFC6455.OP_PONG:
767                     {
768                         LOG.debug("PONG {}",this);
769                         break;
770                     }
771 
772                     case WebSocketConnectionRFC6455.OP_CLOSE:
773                     {
774                         int code=WebSocketConnectionRFC6455.CLOSE_NO_CODE;
775                         String message=null;
776                         if (buffer.length()>=2)
777                         {
778                             code=(0xff&buffer.array()[buffer.getIndex()])*0x100+(0xff&buffer.array()[buffer.getIndex()+1]);
779 
780                             // Validate close status codes.
781                             if (code < WebSocketConnectionRFC6455.CLOSE_NORMAL ||
782                                 code == WebSocketConnectionRFC6455.CLOSE_UNDEFINED ||
783                                 code == WebSocketConnectionRFC6455.CLOSE_NO_CLOSE ||
784                                 code == WebSocketConnectionRFC6455.CLOSE_NO_CODE ||
785                                 ( code > 1011 && code <= 2999 ) ||
786                                 code >= 5000 )
787                             {
788                                 errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"Invalid close code " + code);
789                                 return;
790                             }
791 
792                             if (buffer.length()>2)
793                             {
794                                 if(_utf8.append(buffer.array(),buffer.getIndex()+2,buffer.length()-2,_connection.getMaxTextMessageSize()))
795                                 {
796                                     message = _utf8.toString();
797                                     _utf8.reset();
798                                 }
799                             }
800                         }
801                         else if(buffer.length() == 1)
802                         {
803                             // Invalid length. use status code 1002 (Protocol error)
804                             errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"Invalid payload length of 1");
805                             return;
806                         }
807                         closeIn(code,message);
808                         break;
809                     }
810 
811                     case WebSocketConnectionRFC6455.OP_TEXT:
812                     {
813                         if (_opcode!=-1)
814                         {
815                             errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"Expected Continuation"+Integer.toHexString(opcode));
816                             return;
817                         }
818 
819                         if(_onTextMessage!=null)
820                         {
821                             if (_connection.getMaxTextMessageSize()<=0)
822                             {
823                                 // No size limit, so handle only final frames
824                                 if (lastFrame)
825                                     _onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8));
826                                 else
827                                 {
828                                     LOG.warn("Frame discarded. Text aggregation disabled for {}",_endp);
829                                     errorClose(WebSocketConnectionRFC6455.CLOSE_POLICY_VIOLATION,"Text frame aggregation disabled");
830                                 }
831                             }
832                             // append bytes to message buffer (if they fit)
833                             else if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
834                             {
835                                 if (lastFrame)
836                                 {
837                                     String msg =_utf8.toString();
838                                     _utf8.reset();
839                                     _onTextMessage.onMessage(msg);
840                                 }
841                                 else
842                                 {
843                                     _opcode=WebSocketConnectionRFC6455.OP_TEXT;
844                                 }
845                             }
846                             else
847                                 textMessageTooLarge();
848                         }
849                         break;
850                     }
851 
852                     case WebSocketConnectionRFC6455.OP_BINARY:
853                     {
854                         if (_opcode!=-1)
855                         {
856                             errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"Expected Continuation"+Integer.toHexString(opcode));
857                             return;
858                         }
859 
860                         if (_onBinaryMessage!=null && checkBinaryMessageSize(0,buffer.length()))
861                         {
862                             if (lastFrame)
863                             {
864                                 _onBinaryMessage.onMessage(array,buffer.getIndex(),buffer.length());
865                             }
866                             else if (_connection.getMaxBinaryMessageSize()>=0)
867                             {
868                                 _opcode=opcode;
869                                 // TODO use a growing buffer rather than a fixed one.
870                                 if (_aggregate==null)
871                                     _aggregate=new ByteArrayBuffer(_connection.getMaxBinaryMessageSize());
872                                 _aggregate.put(buffer);
873                             }
874                             else
875                             {
876                                 LOG.warn("Frame discarded. Binary aggregation disabed for {}",_endp);
877                                 errorClose(WebSocketConnectionRFC6455.CLOSE_POLICY_VIOLATION,"Binary frame aggregation disabled");
878                             }
879                         }
880                         break;
881                     }
882 
883                     default:
884                         errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"Bad opcode 0x"+Integer.toHexString(opcode));
885                         break;
886                 }
887             }
888             catch(Utf8Appendable.NotUtf8Exception notUtf8)
889             {
890                 LOG.warn("NOTUTF8 - {} for {}",notUtf8,_endp, notUtf8);
891                 LOG.debug(notUtf8);
892                 errorClose(WebSocketConnectionRFC6455.CLOSE_BAD_PAYLOAD,"Invalid UTF-8");
893             }
894             catch(Throwable e)
895             {
896                 LOG.warn("{} for {}",e,_endp, e);
897                 LOG.debug(e);
898                 errorClose(WebSocketConnectionRFC6455.CLOSE_SERVER_ERROR,"Internal Server Error: "+e);
899             }
900         }
901 
902         private void errorClose(int code, String message)
903         {
904             _connection.close(code,message);
905 
906             // Brutally drop the connection
907             try
908             {
909                 _endp.close();
910             }
911             catch (IOException e)
912             {
913                 LOG.warn(e.toString());
914                 LOG.debug(e);
915             }
916         }
917 
918         private boolean checkBinaryMessageSize(int bufferLen, int length)
919         {
920             int max = _connection.getMaxBinaryMessageSize();
921             if (max>0 && (bufferLen+length)>max)
922             {
923                 LOG.warn("Binary message too large > {}B for {}",_connection.getMaxBinaryMessageSize(),_endp);
924                 _connection.close(WebSocketConnectionRFC6455.CLOSE_MESSAGE_TOO_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
925                 _opcode=-1;
926                 if (_aggregate!=null)
927                     _aggregate.clear();
928                 return false;
929             }
930             return true;
931         }
932 
933         private void textMessageTooLarge()
934         {
935             LOG.warn("Text message too large > {} chars for {}",_connection.getMaxTextMessageSize(),_endp);
936             _connection.close(WebSocketConnectionRFC6455.CLOSE_MESSAGE_TOO_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
937 
938             _opcode=-1;
939             _utf8.reset();
940         }
941 
942         public void close(int code,String message)
943         {
944             if (code!=CLOSE_NORMAL)
945                 LOG.warn("Close: "+code+" "+message);
946             _connection.close(code,message);
947         }
948 
949         @Override
950         public String toString()
951         {
952             return WebSocketConnectionRFC6455.this.toString()+"FH";
953         }
954     }
955 
956     /* ------------------------------------------------------------ */
957     public static String hashKey(String key)
958     {
959         try
960         {
961             MessageDigest md = MessageDigest.getInstance("SHA1");
962             md.update(key.getBytes("UTF-8"));
963             md.update(MAGIC);
964             return new String(B64Code.encode(md.digest()));
965         }
966         catch (Exception e)
967         {
968             throw new RuntimeException(e);
969         }
970     }
971 
972     /* ------------------------------------------------------------ */
973     @Override
974     public String toString()
975     {
976         return String.format("WS/D%d p=%s g=%s", _draft, _parser, _generator);
977     }
978 }