View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.websocket;
20  
21  import java.io.IOException;
22  import java.io.UnsupportedEncodingException;
23  import java.security.MessageDigest;
24  import java.util.Collections;
25  import java.util.List;
26  
27  import org.eclipse.jetty.io.AbstractConnection;
28  import org.eclipse.jetty.io.AsyncEndPoint;
29  import org.eclipse.jetty.io.Buffer;
30  import org.eclipse.jetty.io.ByteArrayBuffer;
31  import org.eclipse.jetty.io.Connection;
32  import org.eclipse.jetty.io.EndPoint;
33  import org.eclipse.jetty.util.B64Code;
34  import org.eclipse.jetty.util.StringUtil;
35  import org.eclipse.jetty.util.Utf8Appendable;
36  import org.eclipse.jetty.util.Utf8StringBuilder;
37  import org.eclipse.jetty.util.log.Log;
38  import org.eclipse.jetty.util.log.Logger;
39  import org.eclipse.jetty.websocket.WebSocket.OnBinaryMessage;
40  import org.eclipse.jetty.websocket.WebSocket.OnControl;
41  import org.eclipse.jetty.websocket.WebSocket.OnFrame;
42  import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
43  
44  
45  /* ------------------------------------------------------------ */
46  /**
47   * <pre>
48   *    0                   1                   2                   3
49   *    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
50   *   +-+-+-+-+-------+-+-------------+-------------------------------+
51   *   |F|R|R|R| opcode|M| Payload len |    Extended payload length    |
52   *   |I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
53   *   |N|V|V|V|       |S|             |   (if payload len==126/127)   |
54   *   | |1|2|3|       |K|             |                               |
55   *   +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
56   *   |     Extended payload length continued, if payload len == 127  |
57   *   + - - - - - - - - - - - - - - - +-------------------------------+
58   *   |                               |Masking-key, if MASK set to 1  |
59   *   +-------------------------------+-------------------------------+
60   *   | Masking-key (continued)       |          Payload Data         |
61   *   +-------------------------------- - - - - - - - - - - - - - - - +
62   *   :                     Payload Data continued ...                :
63   *   + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
64   *   |                     Payload Data continued ...                |
65   *   +---------------------------------------------------------------+
66   * </pre>
67   */
68  public class WebSocketConnectionRFC6455 extends AbstractConnection implements WebSocketConnection
69  {
70      private static final Logger LOG = Log.getLogger(WebSocketConnectionRFC6455.class);
71  
72      final static byte OP_CONTINUATION = 0x00;
73      final static byte OP_TEXT = 0x01;
74      final static byte OP_BINARY = 0x02;
75      final static byte OP_EXT_DATA = 0x03;
76  
77      final static byte OP_CONTROL = 0x08;
78      final static byte OP_CLOSE = 0x08;
79      final static byte OP_PING = 0x09;
80      final static byte OP_PONG = 0x0A;
81      final static byte OP_EXT_CTRL = 0x0B;
82  
83      final static int CLOSE_NORMAL=1000;
84      final static int CLOSE_SHUTDOWN=1001;
85      final static int CLOSE_PROTOCOL=1002;
86      final static int CLOSE_BAD_DATA=1003;
87      final static int CLOSE_UNDEFINED=1004;
88      final static int CLOSE_NO_CODE=1005;
89      final static int CLOSE_NO_CLOSE=1006;
90      final static int CLOSE_BAD_PAYLOAD=1007;
91      final static int CLOSE_POLICY_VIOLATION=1008;
92      final static int CLOSE_MESSAGE_TOO_LARGE=1009;
93      final static int CLOSE_REQUIRED_EXTENSION=1010;
94      final static int CLOSE_SERVER_ERROR=1011;
95      final static int CLOSE_FAILED_TLS_HANDSHAKE=1015;
96  
97      final static int FLAG_FIN=0x8;
98  
99      // Per RFC 6455, section 1.3 - Opening Handshake - this version is "13"
100     final static int VERSION=13;
101 
102     static boolean isLastFrame(byte flags)
103     {
104         return (flags&FLAG_FIN)!=0;
105     }
106 
107     static boolean isControlFrame(byte opcode)
108     {
109         return (opcode&OP_CONTROL)!=0;
110     }
111 
112     private final static byte[] MAGIC;
113     private final List<Extension> _extensions;
114     private final WebSocketParserRFC6455 _parser;
115     private final WebSocketGeneratorRFC6455 _generator;
116     private final WebSocketGenerator _outbound;
117     private final WebSocket _webSocket;
118     private final OnFrame _onFrame;
119     private final OnBinaryMessage _onBinaryMessage;
120     private final OnTextMessage _onTextMessage;
121     private final OnControl _onControl;
122     private final String _protocol;
123     private final int _draft;
124     private final ClassLoader _context;
125     private volatile int _closeCode;
126     private volatile String _closeMessage;
127     private volatile boolean _closedIn;
128     private volatile boolean _closedOut;
129     private int _maxTextMessageSize=-1;
130     private int _maxBinaryMessageSize=-1;
131 
132     static
133     {
134         try
135         {
136             MAGIC="258EAFA5-E914-47DA-95CA-C5AB0DC85B11".getBytes(StringUtil.__ISO_8859_1);
137         }
138         catch (UnsupportedEncodingException e)
139         {
140             throw new RuntimeException(e);
141         }
142     }
143 
144     private final WebSocket.FrameConnection _connection = new WSFrameConnection();
145 
146 
147     /* ------------------------------------------------------------ */
148     public WebSocketConnectionRFC6455(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol, List<Extension> extensions,int draft)
149         throws IOException
150     {
151         this(websocket,endpoint,buffers,timestamp,maxIdleTime,protocol,extensions,draft,null);
152     }
153 
154     /* ------------------------------------------------------------ */
155     public WebSocketConnectionRFC6455(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol, List<Extension> extensions,int draft, MaskGen maskgen)
156         throws IOException
157     {
158         super(endpoint,timestamp);
159 
160         _context=Thread.currentThread().getContextClassLoader();
161 
162         _draft=draft;
163         _endp.setMaxIdleTime(maxIdleTime);
164 
165         _webSocket = websocket;
166         _onFrame=_webSocket instanceof OnFrame ? (OnFrame)_webSocket : null;
167         _onTextMessage=_webSocket instanceof OnTextMessage ? (OnTextMessage)_webSocket : null;
168         _onBinaryMessage=_webSocket instanceof OnBinaryMessage ? (OnBinaryMessage)_webSocket : null;
169         _onControl=_webSocket instanceof OnControl ? (OnControl)_webSocket : null;
170         _generator = new WebSocketGeneratorRFC6455(buffers, _endp,maskgen);
171 
172         _extensions=extensions;
173         WebSocketParser.FrameHandler frameHandler = new WSFrameHandler();
174         if (_extensions!=null)
175         {
176             int e=0;
177             for (Extension extension : _extensions)
178             {
179                 extension.bind(
180                         _connection,
181                         e==extensions.size()-1? frameHandler :extensions.get(e+1),
182                         e==0?_generator:extensions.get(e-1));
183                 e++;
184             }
185         }
186 
187         _outbound=(_extensions==null||_extensions.size()==0)?_generator:extensions.get(extensions.size()-1);
188         WebSocketParser.FrameHandler inbound = (_extensions == null || _extensions.size() == 0) ? frameHandler : extensions.get(0);
189 
190         _parser = new WebSocketParserRFC6455(buffers, endpoint, inbound,maskgen==null);
191 
192         _protocol=protocol;
193 
194     }
195 
196     /* ------------------------------------------------------------ */
197     public WebSocket.Connection getConnection()
198     {
199         return _connection;
200     }
201 
202     /* ------------------------------------------------------------ */
203     public List<Extension> getExtensions()
204     {
205         if (_extensions==null)
206             return Collections.emptyList();
207 
208         return _extensions;
209     }
210 
211     /* ------------------------------------------------------------ */
212     public Connection handle() throws IOException
213     {
214         Thread current = Thread.currentThread();
215         ClassLoader oldcontext = current.getContextClassLoader();
216         current.setContextClassLoader(_context);
217         try
218         {
219             // handle the framing protocol
220             boolean progress=true;
221 
222             while (progress)
223             {
224                 int flushed=_generator.flushBuffer();
225                 int filled=_parser.parseNext();
226 
227                 progress = flushed>0 || filled>0;
228                 _endp.flush();
229 
230                 if (_endp instanceof AsyncEndPoint && ((AsyncEndPoint)_endp).hasProgressed())
231                     progress=true;
232             }
233         }
234         catch(IOException e)
235         {
236             try
237             {
238                 if (_endp.isOpen())
239                     _endp.close();
240             }
241             catch(IOException e2)
242             {
243                 LOG.ignore(e2);
244             }
245             throw e;
246         }
247         finally
248         {
249             current.setContextClassLoader(oldcontext);
250             _parser.returnBuffer();
251             _generator.returnBuffer();
252             if (_endp.isOpen())
253             {
254                 if (_closedIn && _closedOut && _outbound.isBufferEmpty())
255                     _endp.close();
256                 else if (_endp.isInputShutdown() && !_closedIn)
257                     closeIn(CLOSE_NO_CLOSE,null);
258                 else
259                     checkWriteable();
260             }
261         }
262         return this;
263     }
264 
265     /* ------------------------------------------------------------ */
266     public void onInputShutdown() throws IOException
267     {
268         if (!_closedIn)
269             _endp.close();
270     }
271 
272     /* ------------------------------------------------------------ */
273     public boolean isIdle()
274     {
275         return _parser.isBufferEmpty() && _outbound.isBufferEmpty();
276     }
277 
278     /* ------------------------------------------------------------ */
279     @Override
280     public void onIdleExpired(long idleForMs)
281     {
282         closeOut(WebSocketConnectionRFC6455.CLOSE_NORMAL,"Idle for "+idleForMs+"ms > "+_endp.getMaxIdleTime()+"ms");
283     }
284 
285     /* ------------------------------------------------------------ */
286     public boolean isSuspended()
287     {
288         return false;
289     }
290 
291     /* ------------------------------------------------------------ */
292     public void onClose()
293     {
294         final boolean closed;
295         synchronized (this)
296         {
297             closed=_closeCode==0;
298             if (closed)
299                 _closeCode=WebSocketConnectionRFC6455.CLOSE_NO_CLOSE;
300         }
301         if (closed)
302             _webSocket.onClose(WebSocketConnectionRFC6455.CLOSE_NO_CLOSE,"closed");
303     }
304 
305     /* ------------------------------------------------------------ */
306     public void closeIn(int code,String message)
307     {
308         LOG.debug("ClosedIn {} {} {}",this,code,message);
309 
310         final boolean closed_out;
311         final boolean tell_app;
312         synchronized (this)
313         {
314             closed_out=_closedOut;
315             _closedIn=true;
316             tell_app=_closeCode==0;
317             if (tell_app)
318             {
319                 _closeCode=code;
320                 _closeMessage=message;
321             }
322         }
323 
324         try
325         {
326             if (!closed_out)
327                 closeOut(code,message);
328         }
329         finally
330         {
331             if  (tell_app)
332                 _webSocket.onClose(code,message);
333         }
334     }
335 
336     /* ------------------------------------------------------------ */
337     public void closeOut(int code,String message)
338     {
339         LOG.debug("ClosedOut {} {} {}",this,code,message);
340 
341         final boolean closed_out;
342         final boolean tell_app;
343         synchronized (this)
344         {
345             closed_out=_closedOut;
346             _closedOut=true;
347             tell_app=_closeCode==0;
348             if (tell_app)
349             {
350                 _closeCode=code;
351                 _closeMessage=message;
352             }
353         }
354 
355         try
356         {                    
357             if (tell_app)
358                 _webSocket.onClose(code,message);
359         }
360         finally
361         {
362             try
363             {
364                 if (!closed_out)
365                 {
366                     // Close code 1005/1006/1015 are never to be sent as a status over
367                     // a Close control frame. Code<-1 also means no node.
368 
369                     if (code < 0 || (code == WebSocketConnectionRFC6455.CLOSE_NO_CODE) || (code == WebSocketConnectionRFC6455.CLOSE_NO_CLOSE)
370                             || (code == WebSocketConnectionRFC6455.CLOSE_FAILED_TLS_HANDSHAKE))
371                     {
372                         code = -1;
373                     }
374                     else if (code == 0)
375                     {
376                         code = WebSocketConnectionRFC6455.CLOSE_NORMAL;
377                     }
378 
379                     byte[] bytes = ("xx"+(message==null?"":message)).getBytes(StringUtil.__ISO_8859_1);
380                     bytes[0]=(byte)(code/0x100);
381                     bytes[1]=(byte)(code%0x100);
382                     _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionRFC6455.OP_CLOSE,bytes,0,code>0?bytes.length:0);
383                     _outbound.flush();
384                 }
385             }
386             catch(IOException e)
387             {
388                 LOG.ignore(e);
389             }
390         }
391     }
392 
393     public void shutdown()
394     {
395         final WebSocket.Connection connection = _connection;
396         if (connection != null)
397             connection.close(CLOSE_SHUTDOWN, null);
398     }
399 
400     /* ------------------------------------------------------------ */
401     public void fillBuffersFrom(Buffer buffer)
402     {
403         _parser.fill(buffer);
404     }
405 
406     /* ------------------------------------------------------------ */
407     private void checkWriteable()
408     {
409         if (!_outbound.isBufferEmpty() && _endp instanceof AsyncEndPoint)
410         {
411             ((AsyncEndPoint)_endp).scheduleWrite();
412         }
413     }
414 
415     protected void onFrameHandshake()
416     {
417         if (_onFrame != null)
418         {
419             _onFrame.onHandshake(_connection);
420         }
421     }
422 
423     protected void onWebSocketOpen()
424     {
425         _webSocket.onOpen(_connection);
426     }
427 
428     /* ------------------------------------------------------------ */
429     private class WSFrameConnection implements WebSocket.FrameConnection
430     {
431         private volatile boolean _disconnecting;
432 
433         /* ------------------------------------------------------------ */
434         public void sendMessage(String content) throws IOException
435         {
436             if (_closedOut)
437                 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
438             byte[] data = content.getBytes(StringUtil.__UTF8);
439             _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionRFC6455.OP_TEXT,data,0,data.length);
440             checkWriteable();
441         }
442 
443         /* ------------------------------------------------------------ */
444         public void sendMessage(byte[] content, int offset, int length) throws IOException
445         {
446             if (_closedOut)
447                 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
448             _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionRFC6455.OP_BINARY,content,offset,length);
449             checkWriteable();
450         }
451 
452         /* ------------------------------------------------------------ */
453         public void sendFrame(byte flags,byte opcode, byte[] content, int offset, int length) throws IOException
454         {
455             if (_closedOut)
456                 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
457             _outbound.addFrame(flags,opcode,content,offset,length);
458             checkWriteable();
459         }
460 
461         /* ------------------------------------------------------------ */
462         public void sendControl(byte ctrl, byte[] data, int offset, int length) throws IOException
463         {
464             // TODO: section 5.5 states that control frames MUST never be length > 125 bytes and MUST NOT be fragmented
465             if (_closedOut)
466                 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
467             _outbound.addFrame((byte)FLAG_FIN,ctrl,data,offset,length);
468             checkWriteable();
469         }
470 
471         /* ------------------------------------------------------------ */
472         public boolean isMessageComplete(byte flags)
473         {
474             return isLastFrame(flags);
475         }
476 
477         /* ------------------------------------------------------------ */
478         public boolean isOpen()
479         {
480             return _endp!=null&&_endp.isOpen();
481         }
482 
483         /* ------------------------------------------------------------ */
484         public void close(int code, String message)
485         {
486             if (_disconnecting)
487                 return;
488             _disconnecting=true;
489             WebSocketConnectionRFC6455.this.closeOut(code,message);
490         }
491 
492         /* ------------------------------------------------------------ */
493         public void setMaxIdleTime(int ms)
494         {
495             try
496             {
497                 _endp.setMaxIdleTime(ms);
498             }
499             catch(IOException e)
500             {
501                 LOG.warn(e);
502             }
503         }
504 
505         /* ------------------------------------------------------------ */
506         public void setMaxTextMessageSize(int size)
507         {
508             _maxTextMessageSize=size;
509         }
510 
511         /* ------------------------------------------------------------ */
512         public void setMaxBinaryMessageSize(int size)
513         {
514             _maxBinaryMessageSize=size;
515         }
516 
517         /* ------------------------------------------------------------ */
518         public int getMaxIdleTime()
519         {
520             return _endp.getMaxIdleTime();
521         }
522 
523         /* ------------------------------------------------------------ */
524         public int getMaxTextMessageSize()
525         {
526             return _maxTextMessageSize;
527         }
528 
529         /* ------------------------------------------------------------ */
530         public int getMaxBinaryMessageSize()
531         {
532             return _maxBinaryMessageSize;
533         }
534 
535         /* ------------------------------------------------------------ */
536         public String getProtocol()
537         {
538             return _protocol;
539         }
540 
541         /* ------------------------------------------------------------ */
542         public byte binaryOpcode()
543         {
544             return OP_BINARY;
545         }
546 
547         /* ------------------------------------------------------------ */
548         public byte textOpcode()
549         {
550             return OP_TEXT;
551         }
552 
553         /* ------------------------------------------------------------ */
554         public byte continuationOpcode()
555         {
556             return OP_CONTINUATION;
557         }
558 
559         /* ------------------------------------------------------------ */
560         public byte finMask()
561         {
562             return FLAG_FIN;
563         }
564 
565         /* ------------------------------------------------------------ */
566         public boolean isControl(byte opcode)
567         {
568             return isControlFrame(opcode);
569         }
570 
571         /* ------------------------------------------------------------ */
572         public boolean isText(byte opcode)
573         {
574             return opcode==OP_TEXT;
575         }
576 
577         /* ------------------------------------------------------------ */
578         public boolean isBinary(byte opcode)
579         {
580             return opcode==OP_BINARY;
581         }
582 
583         /* ------------------------------------------------------------ */
584         public boolean isContinuation(byte opcode)
585         {
586             return opcode==OP_CONTINUATION;
587         }
588 
589         /* ------------------------------------------------------------ */
590         public boolean isClose(byte opcode)
591         {
592             return opcode==OP_CLOSE;
593         }
594 
595         /* ------------------------------------------------------------ */
596         public boolean isPing(byte opcode)
597         {
598             return opcode==OP_PING;
599         }
600 
601         /* ------------------------------------------------------------ */
602         public boolean isPong(byte opcode)
603         {
604             return opcode==OP_PONG;
605         }
606 
607         /* ------------------------------------------------------------ */
608         public void disconnect()
609         {
610             close(CLOSE_NORMAL,null);
611         }
612 
613         /* ------------------------------------------------------------ */
614         public void close()
615         {
616             close(CLOSE_NORMAL,null);
617         }
618 
619         /* ------------------------------------------------------------ */
620         public void setAllowFrameFragmentation(boolean allowFragmentation)
621         {
622             _parser.setFakeFragments(allowFragmentation);
623         }
624 
625         /* ------------------------------------------------------------ */
626         public boolean isAllowFrameFragmentation()
627         {
628             return _parser.isFakeFragments();
629         }
630 
631         /* ------------------------------------------------------------ */
632         @Override
633         public String toString()
634         {
635             return String.format("%s@%x l(%s:%d)<->r(%s:%d)",
636                     getClass().getSimpleName(),
637                     hashCode(),
638                     _endp.getLocalAddr(),
639                     _endp.getLocalPort(),
640                     _endp.getRemoteAddr(),
641                     _endp.getRemotePort());
642         }
643     }
644 
645     /* ------------------------------------------------------------ */
646     /* ------------------------------------------------------------ */
647     /* ------------------------------------------------------------ */
648     private class WSFrameHandler implements WebSocketParser.FrameHandler
649     {
650         private static final int MAX_CONTROL_FRAME_PAYLOAD = 125;
651         private final Utf8StringBuilder _utf8 = new Utf8StringBuilder(512); // TODO configure initial capacity
652         private ByteArrayBuffer _aggregate;
653         private byte _opcode=-1;
654 
655         public void onFrame(final byte flags, final byte opcode, final Buffer buffer)
656         {
657             boolean lastFrame = isLastFrame(flags);
658 
659             synchronized(WebSocketConnectionRFC6455.this)
660             {
661                 // Ignore incoming after a close
662                 if (_closedIn)
663                     return;
664             }
665             try
666             {
667                 byte[] array=buffer.array();
668 
669                 if (isControlFrame(opcode) && buffer.length()>MAX_CONTROL_FRAME_PAYLOAD)
670                 {
671                     errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"Control frame too large: " + buffer.length() + " > " + MAX_CONTROL_FRAME_PAYLOAD);
672                     return;
673                 }
674 
675                 // TODO: check extensions for RSV bit(s) meanings
676                 if ((flags&0x7)!=0)
677                 {
678                     errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"RSV bits set 0x"+Integer.toHexString(flags));
679                     return;
680                 }
681 
682                 // Ignore all frames after error close
683                 if (_closeCode!=0 && _closeCode!=CLOSE_NORMAL && opcode!=OP_CLOSE)
684                 {
685                     return;
686                 }
687 
688                 // Deliver frame if websocket is a FrameWebSocket
689                 if (_onFrame!=null)
690                 {
691                     if (_onFrame.onFrame(flags,opcode,array,buffer.getIndex(),buffer.length()))
692                         return;
693                 }
694 
695                 if (_onControl!=null && isControlFrame(opcode))
696                 {
697                     if (_onControl.onControl(opcode,array,buffer.getIndex(),buffer.length()))
698                         return;
699                 }
700 
701                 switch(opcode)
702                 {
703                     case WebSocketConnectionRFC6455.OP_CONTINUATION:
704                     {
705                         if (_opcode==-1)
706                         {
707                             errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"Bad Continuation");
708                             return;
709                         }
710 
711                         // If text, append to the message buffer
712                         if (_onTextMessage!=null && _opcode==WebSocketConnectionRFC6455.OP_TEXT)
713                         {
714                             if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
715                             {
716                                 // If this is the last fragment, deliver the text buffer
717                                 if (lastFrame)
718                                 {
719                                     _opcode=-1;
720                                     String msg =_utf8.toString();
721                                     _utf8.reset();
722                                     _onTextMessage.onMessage(msg);
723                                 }
724                             }
725                             else
726                                 textMessageTooLarge();
727                         }
728 
729                         if (_opcode>=0 && _connection.getMaxBinaryMessageSize()>=0)
730                         {
731                             if (_aggregate!=null && checkBinaryMessageSize(_aggregate.length(),buffer.length()))
732                             {
733                                 _aggregate.put(buffer);
734 
735                                 // If this is the last fragment, deliver
736                                 if (lastFrame && _onBinaryMessage!=null)
737                                 {
738                                     try
739                                     {
740                                         _onBinaryMessage.onMessage(_aggregate.array(),_aggregate.getIndex(),_aggregate.length());
741                                     }
742                                     finally
743                                     {
744                                         _opcode=-1;
745                                         _aggregate.clear();
746                                     }
747                                 }
748                             }
749                         }
750                         break;
751                     }
752                     case WebSocketConnectionRFC6455.OP_PING:
753                     {
754                         LOG.debug("PING {}",this);
755                         if (!_closedOut)
756                         {
757                             _connection.sendControl(WebSocketConnectionRFC6455.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length());
758                         }
759                         break;
760                     }
761 
762                     case WebSocketConnectionRFC6455.OP_PONG:
763                     {
764                         LOG.debug("PONG {}",this);
765                         break;
766                     }
767 
768                     case WebSocketConnectionRFC6455.OP_CLOSE:
769                     {
770                         int code=WebSocketConnectionRFC6455.CLOSE_NO_CODE;
771                         String message=null;
772                         if (buffer.length()>=2)
773                         {
774                             code=(0xff&buffer.array()[buffer.getIndex()])*0x100+(0xff&buffer.array()[buffer.getIndex()+1]);
775 
776                             // Validate close status codes.
777                             if (code < WebSocketConnectionRFC6455.CLOSE_NORMAL ||
778                                 code == WebSocketConnectionRFC6455.CLOSE_UNDEFINED ||
779                                 code == WebSocketConnectionRFC6455.CLOSE_NO_CLOSE ||
780                                 code == WebSocketConnectionRFC6455.CLOSE_NO_CODE ||
781                                 ( code > 1011 && code <= 2999 ) ||
782                                 code >= 5000 )
783                             {
784                                 errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"Invalid close code " + code);
785                                 return;
786                             }
787 
788                             if (buffer.length()>2)
789                             {
790                                 if(_utf8.append(buffer.array(),buffer.getIndex()+2,buffer.length()-2,_connection.getMaxTextMessageSize()))
791                                 {
792                                     message = _utf8.toString();
793                                     _utf8.reset();
794                                 }
795                             }
796                         }
797                         else if(buffer.length() == 1)
798                         {
799                             // Invalid length. use status code 1002 (Protocol error)
800                             errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"Invalid payload length of 1");
801                             return;
802                         }
803                         closeIn(code,message);
804                         break;
805                     }
806 
807                     case WebSocketConnectionRFC6455.OP_TEXT:
808                     {
809                         if (_opcode!=-1)
810                         {
811                             errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"Expected Continuation"+Integer.toHexString(opcode));
812                             return;
813                         }
814 
815                         if(_onTextMessage!=null)
816                         {
817                             if (_connection.getMaxTextMessageSize()<=0)
818                             {
819                                 // No size limit, so handle only final frames
820                                 if (lastFrame)
821                                     _onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8));
822                                 else
823                                 {
824                                     LOG.warn("Frame discarded. Text aggregation disabled for {}",_endp);
825                                     errorClose(WebSocketConnectionRFC6455.CLOSE_POLICY_VIOLATION,"Text frame aggregation disabled");
826                                 }
827                             }
828                             // append bytes to message buffer (if they fit)
829                             else if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
830                             {
831                                 if (lastFrame)
832                                 {
833                                     String msg =_utf8.toString();
834                                     _utf8.reset();
835                                     _onTextMessage.onMessage(msg);
836                                 }
837                                 else
838                                 {
839                                     _opcode=WebSocketConnectionRFC6455.OP_TEXT;
840                                 }
841                             }
842                             else
843                                 textMessageTooLarge();
844                         }
845                         break;
846                     }
847 
848                     case WebSocketConnectionRFC6455.OP_BINARY:
849                     {
850                         if (_opcode!=-1)
851                         {
852                             errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"Expected Continuation"+Integer.toHexString(opcode));
853                             return;
854                         }
855 
856                         if (_onBinaryMessage!=null && checkBinaryMessageSize(0,buffer.length()))
857                         {
858                             if (lastFrame)
859                             {
860                                 _onBinaryMessage.onMessage(array,buffer.getIndex(),buffer.length());
861                             }
862                             else if (_connection.getMaxBinaryMessageSize()>=0)
863                             {
864                                 _opcode=opcode;
865                                 // TODO use a growing buffer rather than a fixed one.
866                                 if (_aggregate==null)
867                                     _aggregate=new ByteArrayBuffer(_connection.getMaxBinaryMessageSize());
868                                 _aggregate.put(buffer);
869                             }
870                             else
871                             {
872                                 LOG.warn("Frame discarded. Binary aggregation disabed for {}",_endp);
873                                 errorClose(WebSocketConnectionRFC6455.CLOSE_POLICY_VIOLATION,"Binary frame aggregation disabled");
874                             }
875                         }
876                         break;
877                     }
878 
879                     default:
880                         errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"Bad opcode 0x"+Integer.toHexString(opcode));
881                         break;
882                 }
883             }
884             catch(Utf8Appendable.NotUtf8Exception notUtf8)
885             {
886                 LOG.warn("NOTUTF8 - {} for {}",notUtf8,_endp, notUtf8);
887                 LOG.debug(notUtf8);
888                 errorClose(WebSocketConnectionRFC6455.CLOSE_BAD_PAYLOAD,"Invalid UTF-8");
889             }
890             catch(Throwable e)
891             {
892                 LOG.warn("{} for {}",e,_endp, e);
893                 LOG.debug(e);
894                 errorClose(WebSocketConnectionRFC6455.CLOSE_SERVER_ERROR,"Internal Server Error: "+e);
895             }
896         }
897 
898         private void errorClose(int code, String message)
899         {
900             _connection.close(code,message);
901 
902             // Brutally drop the connection
903             try
904             {
905                 _endp.close();
906             }
907             catch (IOException e)
908             {
909                 LOG.warn(e.toString());
910                 LOG.debug(e);
911             }
912         }
913 
914         private boolean checkBinaryMessageSize(int bufferLen, int length)
915         {
916             int max = _connection.getMaxBinaryMessageSize();
917             if (max>0 && (bufferLen+length)>max)
918             {
919                 LOG.warn("Binary message too large > {}B for {}",_connection.getMaxBinaryMessageSize(),_endp);
920                 _connection.close(WebSocketConnectionRFC6455.CLOSE_MESSAGE_TOO_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
921                 _opcode=-1;
922                 if (_aggregate!=null)
923                     _aggregate.clear();
924                 return false;
925             }
926             return true;
927         }
928 
929         private void textMessageTooLarge()
930         {
931             LOG.warn("Text message too large > {} chars for {}",_connection.getMaxTextMessageSize(),_endp);
932             _connection.close(WebSocketConnectionRFC6455.CLOSE_MESSAGE_TOO_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
933 
934             _opcode=-1;
935             _utf8.reset();
936         }
937 
938         public void close(int code,String message)
939         {
940             if (code!=CLOSE_NORMAL)
941                 LOG.warn("Close: "+code+" "+message);
942             _connection.close(code,message);
943         }
944 
945         @Override
946         public String toString()
947         {
948             return WebSocketConnectionRFC6455.this.toString()+"FH";
949         }
950     }
951 
952     /* ------------------------------------------------------------ */
953     public static String hashKey(String key)
954     {
955         try
956         {
957             MessageDigest md = MessageDigest.getInstance("SHA1");
958             md.update(key.getBytes("UTF-8"));
959             md.update(MAGIC);
960             return new String(B64Code.encode(md.digest()));
961         }
962         catch (Exception e)
963         {
964             throw new RuntimeException(e);
965         }
966     }
967 
968     /* ------------------------------------------------------------ */
969     @Override
970     public String toString()
971     {
972         return String.format("%s p=%s g=%s", getClass().getSimpleName(), _parser, _generator);
973     }
974 }