View Javadoc

1   /*******************************************************************************
2    * Copyright (c) 2011 Intalio, Inc.
3    * ======================================================================
4    * All rights reserved. This program and the accompanying materials
5    * are made available under the terms of the Eclipse Public License v1.0
6    * and Apache License v2.0 which accompanies this distribution.
7    *
8    *   The Eclipse Public License is available at
9    *   http://www.eclipse.org/legal/epl-v10.html
10   *
11   *   The Apache License v2.0 is available at
12   *   http://www.opensource.org/licenses/apache2.0.php
13   *
14   * You may elect to redistribute this code under either of these licenses.
15   *******************************************************************************/
16  // ========================================================================
17  // Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
18  // ------------------------------------------------------------------------
19  // All rights reserved. This program and the accompanying materials
20  // are made available under the terms of the Eclipse Public License v1.0
21  // and Apache License v2.0 which accompanies this distribution.
22  // The Eclipse Public License is available at
23  // http://www.eclipse.org/legal/epl-v10.html
24  // The Apache License v2.0 is available at
25  // http://www.opensource.org/licenses/apache2.0.php
26  // You may elect to redistribute this code under either of these licenses.
27  // ========================================================================
28  
29  package org.eclipse.jetty.websocket;
30  
31  import java.io.IOException;
32  import java.io.UnsupportedEncodingException;
33  import java.security.MessageDigest;
34  import java.util.Collections;
35  import java.util.List;
36  
37  import org.eclipse.jetty.io.AbstractConnection;
38  import org.eclipse.jetty.io.AsyncEndPoint;
39  import org.eclipse.jetty.io.Buffer;
40  import org.eclipse.jetty.io.ByteArrayBuffer;
41  import org.eclipse.jetty.io.Connection;
42  import org.eclipse.jetty.io.EndPoint;
43  import org.eclipse.jetty.util.B64Code;
44  import org.eclipse.jetty.util.StringUtil;
45  import org.eclipse.jetty.util.Utf8Appendable;
46  import org.eclipse.jetty.util.Utf8StringBuilder;
47  import org.eclipse.jetty.util.log.Log;
48  import org.eclipse.jetty.util.log.Logger;
49  import org.eclipse.jetty.websocket.WebSocket.OnBinaryMessage;
50  import org.eclipse.jetty.websocket.WebSocket.OnControl;
51  import org.eclipse.jetty.websocket.WebSocket.OnFrame;
52  import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
53  
54  
55  /* ------------------------------------------------------------ */
56  /**
57   * <pre>
58   *    0                   1                   2                   3
59   *    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
60   *   +-+-+-+-+-------+-+-------------+-------------------------------+
61   *   |F|R|R|R| opcode|M| Payload len |    Extended payload length    |
62   *   |I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
63   *   |N|V|V|V|       |S|             |   (if payload len==126/127)   |
64   *   | |1|2|3|       |K|             |                               |
65   *   +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
66   *   |     Extended payload length continued, if payload len == 127  |
67   *   + - - - - - - - - - - - - - - - +-------------------------------+
68   *   |                               |Masking-key, if MASK set to 1  |
69   *   +-------------------------------+-------------------------------+
70   *   | Masking-key (continued)       |          Payload Data         |
71   *   +-------------------------------- - - - - - - - - - - - - - - - +
72   *   :                     Payload Data continued ...                :
73   *   + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
74   *   |                     Payload Data continued ...                |
75   *   +---------------------------------------------------------------+
76   * </pre>
77   */
78  public class WebSocketConnectionRFC6455 extends AbstractConnection implements WebSocketConnection
79  {
80      private static final Logger LOG = Log.getLogger(WebSocketConnectionRFC6455.class);
81  
82      final static byte OP_CONTINUATION = 0x00;
83      final static byte OP_TEXT = 0x01;
84      final static byte OP_BINARY = 0x02;
85      final static byte OP_EXT_DATA = 0x03;
86  
87      final static byte OP_CONTROL = 0x08;
88      final static byte OP_CLOSE = 0x08;
89      final static byte OP_PING = 0x09;
90      final static byte OP_PONG = 0x0A;
91      final static byte OP_EXT_CTRL = 0x0B;
92  
93      final static int CLOSE_NORMAL=1000;
94      final static int CLOSE_SHUTDOWN=1001;
95      final static int CLOSE_PROTOCOL=1002;
96      final static int CLOSE_BAD_DATA=1003;
97      final static int CLOSE_UNDEFINED=1004;
98      final static int CLOSE_NO_CODE=1005;
99      final static int CLOSE_NO_CLOSE=1006;
100     final static int CLOSE_BAD_PAYLOAD=1007;
101     final static int CLOSE_POLICY_VIOLATION=1008;
102     final static int CLOSE_MESSAGE_TOO_LARGE=1009;
103     final static int CLOSE_REQUIRED_EXTENSION=1010;
104     final static int CLOSE_SERVER_ERROR=1011;
105     final static int CLOSE_FAILED_TLS_HANDSHAKE=1015;
106 
107     final static int FLAG_FIN=0x8;
108 
109     // Per RFC 6455, section 1.3 - Opening Handshake - this version is "13"
110     final static int VERSION=13;
111 
112     static boolean isLastFrame(byte flags)
113     {
114         return (flags&FLAG_FIN)!=0;
115     }
116 
117     static boolean isControlFrame(byte opcode)
118     {
119         return (opcode&OP_CONTROL)!=0;
120     }
121 
122     private final static byte[] MAGIC;
123     private final List<Extension> _extensions;
124     private final WebSocketParserRFC6455 _parser;
125     private final WebSocketGeneratorRFC6455 _generator;
126     private final WebSocketGenerator _outbound;
127     private final WebSocket _webSocket;
128     private final OnFrame _onFrame;
129     private final OnBinaryMessage _onBinaryMessage;
130     private final OnTextMessage _onTextMessage;
131     private final OnControl _onControl;
132     private final String _protocol;
133     private final int _draft;
134     private final ClassLoader _context;
135     private volatile int _closeCode;
136     private volatile String _closeMessage;
137     private volatile boolean _closedIn;
138     private volatile boolean _closedOut;
139     private int _maxTextMessageSize=-1;
140     private int _maxBinaryMessageSize=-1;
141 
142     static
143     {
144         try
145         {
146             MAGIC="258EAFA5-E914-47DA-95CA-C5AB0DC85B11".getBytes(StringUtil.__ISO_8859_1);
147         }
148         catch (UnsupportedEncodingException e)
149         {
150             throw new RuntimeException(e);
151         }
152     }
153 
154     private final WebSocket.FrameConnection _connection = new WSFrameConnection();
155 
156 
157     /* ------------------------------------------------------------ */
158     public WebSocketConnectionRFC6455(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol, List<Extension> extensions,int draft)
159         throws IOException
160     {
161         this(websocket,endpoint,buffers,timestamp,maxIdleTime,protocol,extensions,draft,null);
162     }
163 
164     /* ------------------------------------------------------------ */
165     public WebSocketConnectionRFC6455(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol, List<Extension> extensions,int draft, MaskGen maskgen)
166         throws IOException
167     {
168         super(endpoint,timestamp);
169 
170         _context=Thread.currentThread().getContextClassLoader();
171 
172         _draft=draft;
173         _endp.setMaxIdleTime(maxIdleTime);
174 
175         _webSocket = websocket;
176         _onFrame=_webSocket instanceof OnFrame ? (OnFrame)_webSocket : null;
177         _onTextMessage=_webSocket instanceof OnTextMessage ? (OnTextMessage)_webSocket : null;
178         _onBinaryMessage=_webSocket instanceof OnBinaryMessage ? (OnBinaryMessage)_webSocket : null;
179         _onControl=_webSocket instanceof OnControl ? (OnControl)_webSocket : null;
180         _generator = new WebSocketGeneratorRFC6455(buffers, _endp,maskgen);
181 
182         _extensions=extensions;
183         WebSocketParser.FrameHandler frameHandler = new WSFrameHandler();
184         if (_extensions!=null)
185         {
186             int e=0;
187             for (Extension extension : _extensions)
188             {
189                 extension.bind(
190                         _connection,
191                         e==extensions.size()-1? frameHandler :extensions.get(e+1),
192                         e==0?_generator:extensions.get(e-1));
193                 e++;
194             }
195         }
196 
197         _outbound=(_extensions==null||_extensions.size()==0)?_generator:extensions.get(extensions.size()-1);
198         WebSocketParser.FrameHandler inbound = (_extensions == null || _extensions.size() == 0) ? frameHandler : extensions.get(0);
199 
200         _parser = new WebSocketParserRFC6455(buffers, endpoint, inbound,maskgen==null);
201 
202         _protocol=protocol;
203 
204     }
205 
206     /* ------------------------------------------------------------ */
207     public WebSocket.Connection getConnection()
208     {
209         return _connection;
210     }
211 
212     /* ------------------------------------------------------------ */
213     public List<Extension> getExtensions()
214     {
215         if (_extensions==null)
216             return Collections.emptyList();
217 
218         return _extensions;
219     }
220 
221     /* ------------------------------------------------------------ */
222     public Connection handle() throws IOException
223     {
224         Thread current = Thread.currentThread();
225         ClassLoader oldcontext = current.getContextClassLoader();
226         current.setContextClassLoader(_context);
227         try
228         {
229             // handle the framing protocol
230             boolean progress=true;
231 
232             while (progress)
233             {
234                 int flushed=_generator.flushBuffer();
235                 int filled=_parser.parseNext();
236 
237                 progress = flushed>0 || filled>0;
238                 _endp.flush();
239 
240                 if (_endp instanceof AsyncEndPoint && ((AsyncEndPoint)_endp).hasProgressed())
241                     progress=true;
242             }
243         }
244         catch(IOException e)
245         {
246             try
247             {
248                 if (_endp.isOpen())
249                     _endp.close();
250             }
251             catch(IOException e2)
252             {
253                 LOG.ignore(e2);
254             }
255             throw e;
256         }
257         finally
258         {
259             current.setContextClassLoader(oldcontext);
260             _parser.returnBuffer();
261             _generator.returnBuffer();
262             if (_endp.isOpen())
263             {
264                 if (_closedIn && _closedOut && _outbound.isBufferEmpty())
265                     _endp.close();
266                 else if (_endp.isInputShutdown() && !_closedIn)
267                     closeIn(CLOSE_NO_CLOSE,null);
268                 else
269                     checkWriteable();
270             }
271         }
272         return this;
273     }
274 
275     /* ------------------------------------------------------------ */
276     public void onInputShutdown() throws IOException
277     {
278         if (!_closedIn)
279             _endp.close();
280     }
281 
282     /* ------------------------------------------------------------ */
283     public boolean isIdle()
284     {
285         return _parser.isBufferEmpty() && _outbound.isBufferEmpty();
286     }
287 
288     /* ------------------------------------------------------------ */
289     @Override
290     public void onIdleExpired(long idleForMs)
291     {
292         closeOut(WebSocketConnectionRFC6455.CLOSE_NORMAL,"Idle for "+idleForMs+"ms > "+_endp.getMaxIdleTime()+"ms");
293     }
294 
295     /* ------------------------------------------------------------ */
296     public boolean isSuspended()
297     {
298         return false;
299     }
300 
301     /* ------------------------------------------------------------ */
302     public void onClose()
303     {
304         final boolean closed;
305         synchronized (this)
306         {
307             closed=_closeCode==0;
308             if (closed)
309                 _closeCode=WebSocketConnectionRFC6455.CLOSE_NO_CLOSE;
310         }
311         if (closed)
312             _webSocket.onClose(WebSocketConnectionRFC6455.CLOSE_NO_CLOSE,"closed");
313     }
314 
315     /* ------------------------------------------------------------ */
316     public void closeIn(int code,String message)
317     {
318         LOG.debug("ClosedIn {} {} {}",this,code,message);
319 
320         final boolean closed_out;
321         final boolean tell_app;
322         synchronized (this)
323         {
324             closed_out=_closedOut;
325             _closedIn=true;
326             tell_app=_closeCode==0;
327             if (tell_app)
328             {
329                 _closeCode=code;
330                 _closeMessage=message;
331             }
332         }
333 
334         try
335         {
336             if (tell_app)
337                 _webSocket.onClose(code,message);
338         }
339         finally
340         {
341             if (!closed_out)
342                 closeOut(code,message);
343         }
344     }
345 
346     /* ------------------------------------------------------------ */
347     public void closeOut(int code,String message)
348     {
349         LOG.debug("ClosedOut {} {} {}",this,code,message);
350 
351         final boolean closed_out;
352         final boolean tell_app;
353         synchronized (this)
354         {
355             closed_out=_closedOut;
356             _closedOut=true;
357             tell_app=_closeCode==0;
358             if (tell_app)
359             {
360                 _closeCode=code;
361                 _closeMessage=message;
362             }
363         }
364 
365         try
366         {
367             if (tell_app)
368                 _webSocket.onClose(code,message);
369         }
370         finally
371         {
372             try
373             {
374                 if (!closed_out)
375                 {
376                     // Close code 1005/1006/1015 are never to be sent as a status over
377                     // a Close control frame. Code<-1 also means no node.
378 
379                     if (code < 0 || (code == WebSocketConnectionRFC6455.CLOSE_NO_CODE) || (code == WebSocketConnectionRFC6455.CLOSE_NO_CLOSE)
380                             || (code == WebSocketConnectionRFC6455.CLOSE_FAILED_TLS_HANDSHAKE))
381                     {
382                         code = -1;
383                     }
384                     else if (code == 0)
385                     {
386                         code = WebSocketConnectionRFC6455.CLOSE_NORMAL;
387                     }
388 
389                     byte[] bytes = ("xx"+(message==null?"":message)).getBytes(StringUtil.__ISO_8859_1);
390                     bytes[0]=(byte)(code/0x100);
391                     bytes[1]=(byte)(code%0x100);
392                     _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionRFC6455.OP_CLOSE,bytes,0,code>0?bytes.length:0);
393                     _outbound.flush();
394                 }
395             }
396             catch(IOException e)
397             {
398                 LOG.ignore(e);
399             }
400         }
401     }
402 
403     public void shutdown()
404     {
405         final WebSocket.Connection connection = _connection;
406         if (connection != null)
407             connection.close(CLOSE_SHUTDOWN, null);
408     }
409 
410     /* ------------------------------------------------------------ */
411     public void fillBuffersFrom(Buffer buffer)
412     {
413         _parser.fill(buffer);
414     }
415 
416     /* ------------------------------------------------------------ */
417     private void checkWriteable()
418     {
419         if (!_outbound.isBufferEmpty() && _endp instanceof AsyncEndPoint)
420         {
421             ((AsyncEndPoint)_endp).scheduleWrite();
422         }
423     }
424 
425     protected void onFrameHandshake()
426     {
427         if (_onFrame != null)
428         {
429             _onFrame.onHandshake(_connection);
430         }
431     }
432 
433     protected void onWebSocketOpen()
434     {
435         _webSocket.onOpen(_connection);
436     }
437 
438     /* ------------------------------------------------------------ */
439     private class WSFrameConnection implements WebSocket.FrameConnection
440     {
441         private volatile boolean _disconnecting;
442 
443         /* ------------------------------------------------------------ */
444         public void sendMessage(String content) throws IOException
445         {
446             if (_closedOut)
447                 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
448             byte[] data = content.getBytes(StringUtil.__UTF8);
449             _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionRFC6455.OP_TEXT,data,0,data.length);
450             checkWriteable();
451         }
452 
453         /* ------------------------------------------------------------ */
454         public void sendMessage(byte[] content, int offset, int length) throws IOException
455         {
456             if (_closedOut)
457                 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
458             _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionRFC6455.OP_BINARY,content,offset,length);
459             checkWriteable();
460         }
461 
462         /* ------------------------------------------------------------ */
463         public void sendFrame(byte flags,byte opcode, byte[] content, int offset, int length) throws IOException
464         {
465             if (_closedOut)
466                 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
467             _outbound.addFrame(flags,opcode,content,offset,length);
468             checkWriteable();
469         }
470 
471         /* ------------------------------------------------------------ */
472         public void sendControl(byte ctrl, byte[] data, int offset, int length) throws IOException
473         {
474             // TODO: section 5.5 states that control frames MUST never be length > 125 bytes and MUST NOT be fragmented
475             if (_closedOut)
476                 throw new IOException("closedOut "+_closeCode+":"+_closeMessage);
477             _outbound.addFrame((byte)FLAG_FIN,ctrl,data,offset,length);
478             checkWriteable();
479         }
480 
481         /* ------------------------------------------------------------ */
482         public boolean isMessageComplete(byte flags)
483         {
484             return isLastFrame(flags);
485         }
486 
487         /* ------------------------------------------------------------ */
488         public boolean isOpen()
489         {
490             return _endp!=null&&_endp.isOpen();
491         }
492 
493         /* ------------------------------------------------------------ */
494         public void close(int code, String message)
495         {
496             if (_disconnecting)
497                 return;
498             _disconnecting=true;
499             WebSocketConnectionRFC6455.this.closeOut(code,message);
500         }
501 
502         /* ------------------------------------------------------------ */
503         public void setMaxIdleTime(int ms)
504         {
505             try
506             {
507                 _endp.setMaxIdleTime(ms);
508             }
509             catch(IOException e)
510             {
511                 LOG.warn(e);
512             }
513         }
514 
515         /* ------------------------------------------------------------ */
516         public void setMaxTextMessageSize(int size)
517         {
518             _maxTextMessageSize=size;
519         }
520 
521         /* ------------------------------------------------------------ */
522         public void setMaxBinaryMessageSize(int size)
523         {
524             _maxBinaryMessageSize=size;
525         }
526 
527         /* ------------------------------------------------------------ */
528         public int getMaxIdleTime()
529         {
530             return _endp.getMaxIdleTime();
531         }
532 
533         /* ------------------------------------------------------------ */
534         public int getMaxTextMessageSize()
535         {
536             return _maxTextMessageSize;
537         }
538 
539         /* ------------------------------------------------------------ */
540         public int getMaxBinaryMessageSize()
541         {
542             return _maxBinaryMessageSize;
543         }
544 
545         /* ------------------------------------------------------------ */
546         public String getProtocol()
547         {
548             return _protocol;
549         }
550 
551         /* ------------------------------------------------------------ */
552         public byte binaryOpcode()
553         {
554             return OP_BINARY;
555         }
556 
557         /* ------------------------------------------------------------ */
558         public byte textOpcode()
559         {
560             return OP_TEXT;
561         }
562 
563         /* ------------------------------------------------------------ */
564         public byte continuationOpcode()
565         {
566             return OP_CONTINUATION;
567         }
568 
569         /* ------------------------------------------------------------ */
570         public byte finMask()
571         {
572             return FLAG_FIN;
573         }
574 
575         /* ------------------------------------------------------------ */
576         public boolean isControl(byte opcode)
577         {
578             return isControlFrame(opcode);
579         }
580 
581         /* ------------------------------------------------------------ */
582         public boolean isText(byte opcode)
583         {
584             return opcode==OP_TEXT;
585         }
586 
587         /* ------------------------------------------------------------ */
588         public boolean isBinary(byte opcode)
589         {
590             return opcode==OP_BINARY;
591         }
592 
593         /* ------------------------------------------------------------ */
594         public boolean isContinuation(byte opcode)
595         {
596             return opcode==OP_CONTINUATION;
597         }
598 
599         /* ------------------------------------------------------------ */
600         public boolean isClose(byte opcode)
601         {
602             return opcode==OP_CLOSE;
603         }
604 
605         /* ------------------------------------------------------------ */
606         public boolean isPing(byte opcode)
607         {
608             return opcode==OP_PING;
609         }
610 
611         /* ------------------------------------------------------------ */
612         public boolean isPong(byte opcode)
613         {
614             return opcode==OP_PONG;
615         }
616 
617         /* ------------------------------------------------------------ */
618         public void disconnect()
619         {
620             close(CLOSE_NORMAL,null);
621         }
622 
623         /* ------------------------------------------------------------ */
624         public void close()
625         {
626             close(CLOSE_NORMAL,null);
627         }
628 
629         /* ------------------------------------------------------------ */
630         public void setAllowFrameFragmentation(boolean allowFragmentation)
631         {
632             _parser.setFakeFragments(allowFragmentation);
633         }
634 
635         /* ------------------------------------------------------------ */
636         public boolean isAllowFrameFragmentation()
637         {
638             return _parser.isFakeFragments();
639         }
640 
641         /* ------------------------------------------------------------ */
642         @Override
643         public String toString()
644         {
645             return String.format("%s@%x l(%s:%d)<->r(%s:%d)",
646                     getClass().getSimpleName(),
647                     hashCode(),
648                     _endp.getLocalAddr(),
649                     _endp.getLocalPort(),
650                     _endp.getRemoteAddr(),
651                     _endp.getRemotePort());
652         }
653     }
654 
655     /* ------------------------------------------------------------ */
656     /* ------------------------------------------------------------ */
657     /* ------------------------------------------------------------ */
658     private class WSFrameHandler implements WebSocketParser.FrameHandler
659     {
660         private static final int MAX_CONTROL_FRAME_PAYLOAD = 125;
661         private final Utf8StringBuilder _utf8 = new Utf8StringBuilder(512); // TODO configure initial capacity
662         private ByteArrayBuffer _aggregate;
663         private byte _opcode=-1;
664 
665         public void onFrame(final byte flags, final byte opcode, final Buffer buffer)
666         {
667             boolean lastFrame = isLastFrame(flags);
668 
669             synchronized(WebSocketConnectionRFC6455.this)
670             {
671                 // Ignore incoming after a close
672                 if (_closedIn)
673                     return;
674             }
675             try
676             {
677                 byte[] array=buffer.array();
678 
679                 if (isControlFrame(opcode) && buffer.length()>MAX_CONTROL_FRAME_PAYLOAD)
680                 {
681                     errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"Control frame too large: " + buffer.length() + " > " + MAX_CONTROL_FRAME_PAYLOAD);
682                     return;
683                 }
684 
685                 // TODO: check extensions for RSV bit(s) meanings
686                 if ((flags&0x7)!=0)
687                 {
688                     errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"RSV bits set 0x"+Integer.toHexString(flags));
689                     return;
690                 }
691 
692                 // Ignore all frames after error close
693                 if (_closeCode!=0 && _closeCode!=CLOSE_NORMAL && opcode!=OP_CLOSE)
694                 {
695                     return;
696                 }
697 
698                 // Deliver frame if websocket is a FrameWebSocket
699                 if (_onFrame!=null)
700                 {
701                     if (_onFrame.onFrame(flags,opcode,array,buffer.getIndex(),buffer.length()))
702                         return;
703                 }
704 
705                 if (_onControl!=null && isControlFrame(opcode))
706                 {
707                     if (_onControl.onControl(opcode,array,buffer.getIndex(),buffer.length()))
708                         return;
709                 }
710 
711                 switch(opcode)
712                 {
713                     case WebSocketConnectionRFC6455.OP_CONTINUATION:
714                     {
715                         if (_opcode==-1)
716                         {
717                             errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"Bad Continuation");
718                             return;
719                         }
720 
721                         // If text, append to the message buffer
722                         if (_onTextMessage!=null && _opcode==WebSocketConnectionRFC6455.OP_TEXT)
723                         {
724                             if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
725                             {
726                                 // If this is the last fragment, deliver the text buffer
727                                 if (lastFrame)
728                                 {
729                                     _opcode=-1;
730                                     String msg =_utf8.toString();
731                                     _utf8.reset();
732                                     _onTextMessage.onMessage(msg);
733                                 }
734                             }
735                             else
736                                 textMessageTooLarge();
737                         }
738 
739                         if (_opcode>=0 && _connection.getMaxBinaryMessageSize()>=0)
740                         {
741                             if (_aggregate!=null && checkBinaryMessageSize(_aggregate.length(),buffer.length()))
742                             {
743                                 _aggregate.put(buffer);
744 
745                                 // If this is the last fragment, deliver
746                                 if (lastFrame && _onBinaryMessage!=null)
747                                 {
748                                     try
749                                     {
750                                         _onBinaryMessage.onMessage(_aggregate.array(),_aggregate.getIndex(),_aggregate.length());
751                                     }
752                                     finally
753                                     {
754                                         _opcode=-1;
755                                         _aggregate.clear();
756                                     }
757                                 }
758                             }
759                         }
760                         break;
761                     }
762                     case WebSocketConnectionRFC6455.OP_PING:
763                     {
764                         LOG.debug("PING {}",this);
765                         if (!_closedOut)
766                         {
767                             _connection.sendControl(WebSocketConnectionRFC6455.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length());
768                         }
769                         break;
770                     }
771 
772                     case WebSocketConnectionRFC6455.OP_PONG:
773                     {
774                         LOG.debug("PONG {}",this);
775                         break;
776                     }
777 
778                     case WebSocketConnectionRFC6455.OP_CLOSE:
779                     {
780                         int code=WebSocketConnectionRFC6455.CLOSE_NO_CODE;
781                         String message=null;
782                         if (buffer.length()>=2)
783                         {
784                             code=(0xff&buffer.array()[buffer.getIndex()])*0x100+(0xff&buffer.array()[buffer.getIndex()+1]);
785 
786                             // Validate close status codes.
787                             if (code < WebSocketConnectionRFC6455.CLOSE_NORMAL ||
788                                 code == WebSocketConnectionRFC6455.CLOSE_UNDEFINED ||
789                                 code == WebSocketConnectionRFC6455.CLOSE_NO_CLOSE ||
790                                 code == WebSocketConnectionRFC6455.CLOSE_NO_CODE ||
791                                 ( code > 1011 && code <= 2999 ) ||
792                                 code >= 5000 )
793                             {
794                                 errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"Invalid close code " + code);
795                                 return;
796                             }
797 
798                             if (buffer.length()>2)
799                             {
800                                 if(_utf8.append(buffer.array(),buffer.getIndex()+2,buffer.length()-2,_connection.getMaxTextMessageSize()))
801                                 {
802                                     message = _utf8.toString();
803                                     _utf8.reset();
804                                 }
805                             }
806                         }
807                         else if(buffer.length() == 1)
808                         {
809                             // Invalid length. use status code 1002 (Protocol error)
810                             errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"Invalid payload length of 1");
811                             return;
812                         }
813                         closeIn(code,message);
814                         break;
815                     }
816 
817                     case WebSocketConnectionRFC6455.OP_TEXT:
818                     {
819                         if (_opcode!=-1)
820                         {
821                             errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"Expected Continuation"+Integer.toHexString(opcode));
822                             return;
823                         }
824 
825                         if(_onTextMessage!=null)
826                         {
827                             if (_connection.getMaxTextMessageSize()<=0)
828                             {
829                                 // No size limit, so handle only final frames
830                                 if (lastFrame)
831                                     _onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8));
832                                 else
833                                 {
834                                     LOG.warn("Frame discarded. Text aggregation disabled for {}",_endp);
835                                     errorClose(WebSocketConnectionRFC6455.CLOSE_POLICY_VIOLATION,"Text frame aggregation disabled");
836                                 }
837                             }
838                             // append bytes to message buffer (if they fit)
839                             else if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
840                             {
841                                 if (lastFrame)
842                                 {
843                                     String msg =_utf8.toString();
844                                     _utf8.reset();
845                                     _onTextMessage.onMessage(msg);
846                                 }
847                                 else
848                                 {
849                                     _opcode=WebSocketConnectionRFC6455.OP_TEXT;
850                                 }
851                             }
852                             else
853                                 textMessageTooLarge();
854                         }
855                         break;
856                     }
857 
858                     case WebSocketConnectionRFC6455.OP_BINARY:
859                     {
860                         if (_opcode!=-1)
861                         {
862                             errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"Expected Continuation"+Integer.toHexString(opcode));
863                             return;
864                         }
865 
866                         if (_onBinaryMessage!=null && checkBinaryMessageSize(0,buffer.length()))
867                         {
868                             if (lastFrame)
869                             {
870                                 _onBinaryMessage.onMessage(array,buffer.getIndex(),buffer.length());
871                             }
872                             else if (_connection.getMaxBinaryMessageSize()>=0)
873                             {
874                                 _opcode=opcode;
875                                 // TODO use a growing buffer rather than a fixed one.
876                                 if (_aggregate==null)
877                                     _aggregate=new ByteArrayBuffer(_connection.getMaxBinaryMessageSize());
878                                 _aggregate.put(buffer);
879                             }
880                             else
881                             {
882                                 LOG.warn("Frame discarded. Binary aggregation disabed for {}",_endp);
883                                 errorClose(WebSocketConnectionRFC6455.CLOSE_POLICY_VIOLATION,"Binary frame aggregation disabled");
884                             }
885                         }
886                         break;
887                     }
888 
889                     default:
890                         errorClose(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,"Bad opcode 0x"+Integer.toHexString(opcode));
891                         break;
892                 }
893             }
894             catch(Utf8Appendable.NotUtf8Exception notUtf8)
895             {
896                 LOG.warn("NOTUTF8 - {} for {}",notUtf8,_endp, notUtf8);
897                 LOG.debug(notUtf8);
898                 errorClose(WebSocketConnectionRFC6455.CLOSE_BAD_PAYLOAD,"Invalid UTF-8");
899             }
900             catch(Throwable e)
901             {
902                 LOG.warn("{} for {}",e,_endp, e);
903                 LOG.debug(e);
904                 errorClose(WebSocketConnectionRFC6455.CLOSE_SERVER_ERROR,"Internal Server Error: "+e);
905             }
906         }
907 
908         private void errorClose(int code, String message)
909         {
910             _connection.close(code,message);
911 
912             // Brutally drop the connection
913             try
914             {
915                 _endp.close();
916             }
917             catch (IOException e)
918             {
919                 LOG.warn(e.toString());
920                 LOG.debug(e);
921             }
922         }
923 
924         private boolean checkBinaryMessageSize(int bufferLen, int length)
925         {
926             int max = _connection.getMaxBinaryMessageSize();
927             if (max>0 && (bufferLen+length)>max)
928             {
929                 LOG.warn("Binary message too large > {}B for {}",_connection.getMaxBinaryMessageSize(),_endp);
930                 _connection.close(WebSocketConnectionRFC6455.CLOSE_MESSAGE_TOO_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
931                 _opcode=-1;
932                 if (_aggregate!=null)
933                     _aggregate.clear();
934                 return false;
935             }
936             return true;
937         }
938 
939         private void textMessageTooLarge()
940         {
941             LOG.warn("Text message too large > {} chars for {}",_connection.getMaxTextMessageSize(),_endp);
942             _connection.close(WebSocketConnectionRFC6455.CLOSE_MESSAGE_TOO_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
943 
944             _opcode=-1;
945             _utf8.reset();
946         }
947 
948         public void close(int code,String message)
949         {
950             if (code!=CLOSE_NORMAL)
951                 LOG.warn("Close: "+code+" "+message);
952             _connection.close(code,message);
953         }
954 
955         @Override
956         public String toString()
957         {
958             return WebSocketConnectionRFC6455.this.toString()+"FH";
959         }
960     }
961 
962     /* ------------------------------------------------------------ */
963     public static String hashKey(String key)
964     {
965         try
966         {
967             MessageDigest md = MessageDigest.getInstance("SHA1");
968             md.update(key.getBytes("UTF-8"));
969             md.update(MAGIC);
970             return new String(B64Code.encode(md.digest()));
971         }
972         catch (Exception e)
973         {
974             throw new RuntimeException(e);
975         }
976     }
977 
978     /* ------------------------------------------------------------ */
979     @Override
980     public String toString()
981     {
982         return String.format("%s p=%s g=%s", getClass().getSimpleName(), _parser, _generator);
983     }
984 }